From 86b5392cb830a987e236b888774c60419c1d2c4f Mon Sep 17 00:00:00 2001 From: Robert Rossmann Date: Tue, 12 Mar 2024 23:56:36 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20final=20round=20of=20refactoring,=20rea?= =?UTF-8?q?dme=20overhaul=20=F0=9F=8E=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- background.go | 7 +-- readme.md | 134 +++++++++++++++++++++++++++++-------------- samples/slog/slog.go | 3 +- task/logvaluer.go | 33 +++++++++++ task/task.go | 37 ++---------- 5 files changed, 135 insertions(+), 79 deletions(-) create mode 100644 task/logvaluer.go diff --git a/background.go b/background.go index 01f19c6..e19e548 100644 --- a/background.go +++ b/background.go @@ -9,7 +9,6 @@ import ( "go.strv.io/background/task" "github.com/kamilsk/retry/v5" - "github.com/kamilsk/retry/v5/strategy" ) // Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish or cancel their @@ -18,7 +17,7 @@ import ( type Manager struct { stalledThreshold time.Duration observer observer.Observer - retry []strategy.Strategy + retry task.Retry taskmgr taskmgr loopmgr loopmgr } @@ -32,8 +31,8 @@ type Options struct { // schedule. These are useful for logging, monitoring, etc. Observer observer.Observer // Retry defines the default retry strategies that will be used for all tasks unless overridden by the task. Several - // strategies are provided by github.com/kamilsk/retry/v5/strategy package. - Retry []strategy.Strategy + // strategies are provided by https://pkg.go.dev/github.com/kamilsk/retry/v5/strategy package. + Retry task.Retry } // NewManager creates a new instance of Manager with default options and no observer. diff --git a/readme.md b/readme.md index f8820ac..89d30aa 100644 --- a/readme.md +++ b/readme.md @@ -1,16 +1,16 @@ -# `go.strv.io/background` +
+

go.strv.io/background

-[![Tests][badge-tests]][workflow-tests] [![codecov][badge-codecov]][codecov-dashboard] + [![Continuous Integration][badge-ci]][workflow-ci] [![codecov][badge-codecov]][codecov-dashboard] -> A package that keeps track of goroutines and allows you to wait for them to finish when it's time to shut down your application. + > Never lose your goroutine again.
Built with ❤️ at [STRV](https://www.strv.com) +
-## Purpose +## About -In Go, when the `main` function returns, any pending goroutines are terminated. This means that we need to keep track of them somehow so that `main` can wait for them to finish before returning. This is also useful in the context of servers - when the server receives a terminating signal from the host OS (ie. due to a new release being deployed) the application needs a way to delay the shutdown long enough for the goroutines to finish before allowing itself to be terminated. +This package provides mechanism to easily run a task (a function) in a goroutine and provides mechanisms to wait for all tasks to finish (a synchronisation point). Addiionally, you can attach an observer to the background manager and get notified when something interesting happens to your task - like when it finishes, errors or takes too long to complete. This allows you to centralise your error reporting and logging. -This library makes that management process easier and adds some extra functionality on top, for good measure. - -> ⚠️ By no means is this a replacement for proper job queue system! The intended use case is for small, relatively fast functions that either do the actual work or schedule a job in some kind of a queue to do that work. Since even putting a job into a queue takes some time, you can remove that time from the client's request/response cycle and make your backend respond faster. +The purpose of the central synchronisation point for all your background goroutines is to make sure that your application does not exit before all goroutines have finished. You can trigger the synchronisation when your application receives a terminating signal, for example, and wait for all tasks to finish before allowing the application to exit. ## Installation @@ -20,54 +20,104 @@ go get go.strv.io/background ## Usage +There are two types of tasks you can schedule: + +- one-off: a task that runs once and then finishes +- looping: a task that runs repeatedly until it is stopped + +One-off tasks are great for triggering a single operation in the background, like sending an email or processing a file. Looping tasks are great for running a background worker that processes a queue of items, like reading from an AWS SQS queue periodically. + +Additionally, each task can define its retry policies, which allows you to automatically retry the task if it fails. For one-off tasks, the task is repeated until its retry policy says it should stop, then the task is considered finished. For looping tasks, the retry policy is applied to each iteration of the task - upon failure, the task is retried using its defined retry policy and when the policy says it should stop, the task continues on to the next iteration and the process repeats. + +### Initialising the manager + ```go package main import ( - "context" - "fmt" + "context" - "go.strv.io/background" + "go.strv.io/background" + "go.strv.io/background/observer" ) -// Define a type for the metadata that you want to associate with your tasks. -// The metadata is provided by the caller when a task is scheduled and is passed -// to the monitoring functions. -type TaskMetadata string - func main() { - // Create a new background manager - manager := background.NewManager[TaskMetadata]() - // Define some monitoring functions for logging or error reporting - manager.OnTaskAdded = func(ctx context.Context, meta TaskMetadata) { - fmt.Println("Task added:", meta) - } - manager.OnTaskSucceeded = func(ctx context.Context, meta TaskMetadata) { - fmt.Println("Task succeeded:", meta) - } - manager.OnTaskFailed = func(ctx context.Context, meta TaskMetadata, err error) { - fmt.Println("Task failed:", meta, err) - } - - // ... elsewhere in your codebase - manager.Run(context.Background(), "goroutine-1", func(ctx context.Context) error { - // Do some work here - return nil - }) - - - // Wait for all goroutines to finish - // Make sure you stop your components from adding more tasks - manager.Wait() - // Now it's safe to terminate the process + manager := background.NewManagerWithOptions(background.Options{ + // Use one of the provided observers that prints logs to the console using log/slog + // Feel free to implement your own. + Observer: observer.Slog{}, + }) + + // Share the manager with the rest of your application + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + <-interrupt + // Wait for all tasks to finish, then allow the application to exit + manager.Close() +} +``` + +### Scheduling tasks + +```go +import ( + "time" + + "go.strv.io/background" + "go.strv.io/background/task" + "github.com/kamilsk/retry/v5/strategy" +) + +maanger := background.NewManagerWithOptions(background.Options{ + Observer: observer.Slog{}, +}) + +// Schedule a one-off task - the task will run only once (except if it fails and has a retry policy) +oneoff := task.Task{ + Type: task.TypeOneOff, + Fn: func(ctx context.Context) error { + // Do something interesting... + <-time.After(3 * time.Second) + return nil + }, + Retry: task.Retry{ + strategy.Limit(3), + } +} + +manager.RunTask(context.Background(), oneoff) + +// Schedule a looping task - the task will run repeatedly until it is stopped +looping := task.Task{ + Type: task.TypeLoop, + Fn: func(ctx context.Context) error { + // Do something interesting... + <-time.After(3 * time.Second) + return nil + }, + Retry: task.Retry{ + strategy.Limit(3), + } } + +// Schedule the task to be continuously run in an infinite loop until manager.Close() is called +manager.RunTask(context.Background(), looping) +``` + +## Examples + +You can find a sample executable in the [samples](samples) folder. To run them, clone the repository and run: + +```sh +go run samples/slog/slog.go ``` ## License See the [LICENSE](LICENSE) file for details. -[badge-tests]: https://github.com/strvcom/strv-backend-go-background/actions/workflows/test.yaml/badge.svg -[workflow-tests]: https://github.com/strvcom/strv-backend-go-background/actions/workflows/test.yaml +[badge-ci]: https://github.com/strvcom/strv-backend-go-background/actions/workflows/ci.yaml/badge.svg +[workflow-ci]: https://github.com/strvcom/strv-backend-go-background/actions/workflows/ci.yaml [badge-codecov]: https://codecov.io/gh/strvcom/strv-backend-go-background/graph/badge.svg?token=ST3JD5GCRN [codecov-dashboard]: https://codecov.io/gh/strvcom/strv-backend-go-background diff --git a/samples/slog/slog.go b/samples/slog/slog.go index 8278e84..c9ecc0e 100644 --- a/samples/slog/slog.go +++ b/samples/slog/slog.go @@ -23,11 +23,12 @@ var ( func main() { // Customise the default logger to output JSON slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil))) + slog.Info("application starting - press Ctrl+C to terminate") manager := background.NewManagerWithOptions(background.Options{ // Use the provided Slog observer to save some development time Observer: observer.Slog{}, - Retry: []strategy.Strategy{ + Retry: task.Retry{ strategy.Limit(1), }, }) diff --git a/task/logvaluer.go b/task/logvaluer.go new file mode 100644 index 0000000..2043116 --- /dev/null +++ b/task/logvaluer.go @@ -0,0 +1,33 @@ +package task + +import "log/slog" + +// LogValue implements slog.LogValuer. +func (t Type) LogValue() slog.Value { + switch t { + case TypeOneOff: + return slog.StringValue("oneoff") + case TypeLoop: + return slog.StringValue("loop") + default: + return slog.StringValue("invalid") + } +} + +// LogValue implements slog.LogValuer. +func (definition Task) LogValue() slog.Value { + return slog.GroupValue( + slog.Any("type", definition.Type), + slog.Any("meta", definition.Meta), + ) +} + +// LogValue implements slog.LogValuer. +func (meta Metadata) LogValue() slog.Value { + values := make([]slog.Attr, 0, len(meta)) + for key, value := range meta { + values = append(values, slog.String(key, value)) + } + + return slog.GroupValue(values...) +} diff --git a/task/task.go b/task/task.go index 74a7f19..fd2cbd5 100644 --- a/task/task.go +++ b/task/task.go @@ -3,7 +3,6 @@ package task import ( "context" "errors" - "log/slog" "github.com/kamilsk/retry/v5/strategy" ) @@ -19,18 +18,6 @@ const ( TypeLoop ) -// LogValue implements slog.LogValuer. -func (t Type) LogValue() slog.Value { - switch t { - case TypeOneOff: - return slog.StringValue("oneoff") - case TypeLoop: - return slog.StringValue("loop") - default: - return slog.StringValue("invalid") - } -} - var ( // ErrUnknownType is returned when the task type is not a valid value of Type. ErrUnknownType = errors.New("unknown task type") @@ -47,16 +34,8 @@ type Task struct { Meta Metadata // Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry // strategies you might have configured in the Manager. Several strategies are provided by - // github.com/kamilsk/retry/v5/strategy package. - Retry []strategy.Strategy -} - -// LogValue implements slog.LogValuer. -func (definition Task) LogValue() slog.Value { - return slog.GroupValue( - slog.Any("type", definition.Type), - slog.Any("meta", definition.Meta), - ) + // https://pkg.go.dev/github.com/kamilsk/retry/v5/strategy package. + Retry Retry } // Fn is the function to be executed in a goroutine. @@ -66,12 +45,6 @@ type Fn func(ctx context.Context) error // methods to help you identify the task or get more context about it. type Metadata map[string]string -// LogValue implements slog.LogValuer. -func (meta Metadata) LogValue() slog.Value { - values := make([]slog.Attr, 0, len(meta)) - for key, value := range meta { - values = append(values, slog.String(key, value)) - } - - return slog.GroupValue(values...) -} +// Retry defines how the task should be retried in case of failure (if at all). Several strategies are provided by +// https://pkg.go.dev/github.com/kamilsk/retry/v5/strategy package. +type Retry []strategy.Strategy