Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: final round of refactoring, readme overhaul 🎓 #13

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions background.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.strv.io/background/task"

"github.com/kamilsk/retry/v5"
"github.com/kamilsk/retry/v5/strategy"
)

// Manager keeps track of scheduled goroutines and provides mechanisms to wait for them to finish or cancel their
Expand All @@ -18,7 +17,7 @@ import (
type Manager struct {
stalledThreshold time.Duration
observer observer.Observer
retry []strategy.Strategy
retry task.Retry
taskmgr taskmgr
loopmgr loopmgr
}
Expand All @@ -32,8 +31,8 @@ type Options struct {
// schedule. These are useful for logging, monitoring, etc.
Observer observer.Observer
// Retry defines the default retry strategies that will be used for all tasks unless overridden by the task. Several
// strategies are provided by github.com/kamilsk/retry/v5/strategy package.
Retry []strategy.Strategy
// strategies are provided by https://pkg.go.dev/github.com/kamilsk/retry/v5/strategy package.
Retry task.Retry
}

// NewManager creates a new instance of Manager with default options and no observer.
Expand Down
134 changes: 92 additions & 42 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# `go.strv.io/background`
<div align="center">
<h1><code>go.strv.io/background</code></h1>

[![Tests][badge-tests]][workflow-tests] [![codecov][badge-codecov]][codecov-dashboard]
[![Continuous Integration][badge-ci]][workflow-ci] [![codecov][badge-codecov]][codecov-dashboard]

> A package that keeps track of goroutines and allows you to wait for them to finish when it's time to shut down your application.
> Never lose your goroutine again.<br />Built with ❤️ at [STRV](https://www.strv.com)
</div>
## Purpose
## About

In Go, when the `main` function returns, any pending goroutines are terminated. This means that we need to keep track of them somehow so that `main` can wait for them to finish before returning. This is also useful in the context of servers - when the server receives a terminating signal from the host OS (ie. due to a new release being deployed) the application needs a way to delay the shutdown long enough for the goroutines to finish before allowing itself to be terminated.
This package provides mechanism to easily run a task (a function) in a goroutine and provides mechanisms to wait for all tasks to finish (a synchronisation point). Addiionally, you can attach an observer to the background manager and get notified when something interesting happens to your task - like when it finishes, errors or takes too long to complete. This allows you to centralise your error reporting and logging.

This library makes that management process easier and adds some extra functionality on top, for good measure.

> ⚠️ By no means is this a replacement for proper job queue system! The intended use case is for small, relatively fast functions that either do the actual work or schedule a job in some kind of a queue to do that work. Since even putting a job into a queue takes some time, you can remove that time from the client's request/response cycle and make your backend respond faster.
The purpose of the central synchronisation point for all your background goroutines is to make sure that your application does not exit before all goroutines have finished. You can trigger the synchronisation when your application receives a terminating signal, for example, and wait for all tasks to finish before allowing the application to exit.

## Installation

Expand All @@ -20,54 +20,104 @@ go get go.strv.io/background

## Usage

There are two types of tasks you can schedule:

- one-off: a task that runs once and then finishes
- looping: a task that runs repeatedly until it is stopped

One-off tasks are great for triggering a single operation in the background, like sending an email or processing a file. Looping tasks are great for running a background worker that processes a queue of items, like reading from an AWS SQS queue periodically.

Additionally, each task can define its retry policies, which allows you to automatically retry the task if it fails. For one-off tasks, the task is repeated until its retry policy says it should stop, then the task is considered finished. For looping tasks, the retry policy is applied to each iteration of the task - upon failure, the task is retried using its defined retry policy and when the policy says it should stop, the task continues on to the next iteration and the process repeats.

### Initialising the manager

```go
package main

import (
"context"
"fmt"
"context"

"go.strv.io/background"
"go.strv.io/background"
"go.strv.io/background/observer"
)

// Define a type for the metadata that you want to associate with your tasks.
// The metadata is provided by the caller when a task is scheduled and is passed
// to the monitoring functions.
type TaskMetadata string

func main() {
// Create a new background manager
manager := background.NewManager[TaskMetadata]()
// Define some monitoring functions for logging or error reporting
manager.OnTaskAdded = func(ctx context.Context, meta TaskMetadata) {
fmt.Println("Task added:", meta)
}
manager.OnTaskSucceeded = func(ctx context.Context, meta TaskMetadata) {
fmt.Println("Task succeeded:", meta)
}
manager.OnTaskFailed = func(ctx context.Context, meta TaskMetadata, err error) {
fmt.Println("Task failed:", meta, err)
}

// ... elsewhere in your codebase
manager.Run(context.Background(), "goroutine-1", func(ctx context.Context) error {
// Do some work here
return nil
})


// Wait for all goroutines to finish
// Make sure you stop your components from adding more tasks
manager.Wait()
// Now it's safe to terminate the process
manager := background.NewManagerWithOptions(background.Options{
// Use one of the provided observers that prints logs to the console using log/slog
// Feel free to implement your own.
Observer: observer.Slog{},
})

// Share the manager with the rest of your application

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
// Wait for all tasks to finish, then allow the application to exit
manager.Close()
}
```

### Scheduling tasks

```go
import (
"time"

"go.strv.io/background"
"go.strv.io/background/task"
"github.com/kamilsk/retry/v5/strategy"
)

maanger := background.NewManagerWithOptions(background.Options{
Observer: observer.Slog{},
})

// Schedule a one-off task - the task will run only once (except if it fails and has a retry policy)
oneoff := task.Task{
Type: task.TypeOneOff,
Fn: func(ctx context.Context) error {
// Do something interesting...
<-time.After(3 * time.Second)
return nil
},
Retry: task.Retry{
strategy.Limit(3),
}
}

manager.RunTask(context.Background(), oneoff)

// Schedule a looping task - the task will run repeatedly until it is stopped
looping := task.Task{
Type: task.TypeLoop,
Fn: func(ctx context.Context) error {
// Do something interesting...
<-time.After(3 * time.Second)
return nil
},
Retry: task.Retry{
strategy.Limit(3),
}
}

// Schedule the task to be continuously run in an infinite loop until manager.Close() is called
manager.RunTask(context.Background(), looping)
```

## Examples

You can find a sample executable in the [samples](samples) folder. To run them, clone the repository and run:

```sh
go run samples/slog/slog.go
```

## License

See the [LICENSE](LICENSE) file for details.

[badge-tests]: https://github.com/strvcom/strv-backend-go-background/actions/workflows/test.yaml/badge.svg
[workflow-tests]: https://github.com/strvcom/strv-backend-go-background/actions/workflows/test.yaml
[badge-ci]: https://github.com/strvcom/strv-backend-go-background/actions/workflows/ci.yaml/badge.svg
[workflow-ci]: https://github.com/strvcom/strv-backend-go-background/actions/workflows/ci.yaml
[badge-codecov]: https://codecov.io/gh/strvcom/strv-backend-go-background/graph/badge.svg?token=ST3JD5GCRN
[codecov-dashboard]: https://codecov.io/gh/strvcom/strv-backend-go-background
3 changes: 2 additions & 1 deletion samples/slog/slog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ var (
func main() {
// Customise the default logger to output JSON
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, nil)))
slog.Info("application starting - press Ctrl+C to terminate")

manager := background.NewManagerWithOptions(background.Options{
// Use the provided Slog observer to save some development time
Observer: observer.Slog{},
Retry: []strategy.Strategy{
Retry: task.Retry{
strategy.Limit(1),
},
})
Expand Down
33 changes: 33 additions & 0 deletions task/logvaluer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package task

import "log/slog"

// LogValue implements slog.LogValuer.
func (t Type) LogValue() slog.Value {
switch t {
case TypeOneOff:
return slog.StringValue("oneoff")
case TypeLoop:
return slog.StringValue("loop")
default:
return slog.StringValue("invalid")
}
}

// LogValue implements slog.LogValuer.
func (definition Task) LogValue() slog.Value {
return slog.GroupValue(
slog.Any("type", definition.Type),
slog.Any("meta", definition.Meta),
)
}

// LogValue implements slog.LogValuer.
func (meta Metadata) LogValue() slog.Value {
values := make([]slog.Attr, 0, len(meta))
for key, value := range meta {
values = append(values, slog.String(key, value))
}

return slog.GroupValue(values...)
}
37 changes: 5 additions & 32 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package task
import (
"context"
"errors"
"log/slog"

"github.com/kamilsk/retry/v5/strategy"
)
Expand All @@ -19,18 +18,6 @@ const (
TypeLoop
)

// LogValue implements slog.LogValuer.
func (t Type) LogValue() slog.Value {
switch t {
case TypeOneOff:
return slog.StringValue("oneoff")
case TypeLoop:
return slog.StringValue("loop")
default:
return slog.StringValue("invalid")
}
}

var (
// ErrUnknownType is returned when the task type is not a valid value of Type.
ErrUnknownType = errors.New("unknown task type")
Expand All @@ -47,16 +34,8 @@ type Task struct {
Meta Metadata
// Retry defines how the task should be retried in case of failure (if at all). This overrides the default retry
// strategies you might have configured in the Manager. Several strategies are provided by
// github.com/kamilsk/retry/v5/strategy package.
Retry []strategy.Strategy
}

// LogValue implements slog.LogValuer.
func (definition Task) LogValue() slog.Value {
return slog.GroupValue(
slog.Any("type", definition.Type),
slog.Any("meta", definition.Meta),
)
// https://pkg.go.dev/github.com/kamilsk/retry/v5/strategy package.
Retry Retry
}

// Fn is the function to be executed in a goroutine.
Expand All @@ -66,12 +45,6 @@ type Fn func(ctx context.Context) error
// methods to help you identify the task or get more context about it.
type Metadata map[string]string

// LogValue implements slog.LogValuer.
func (meta Metadata) LogValue() slog.Value {
values := make([]slog.Attr, 0, len(meta))
for key, value := range meta {
values = append(values, slog.String(key, value))
}

return slog.GroupValue(values...)
}
// Retry defines how the task should be retried in case of failure (if at all). Several strategies are provided by
// https://pkg.go.dev/github.com/kamilsk/retry/v5/strategy package.
type Retry []strategy.Strategy