-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
14 changed files
with
685 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package runutil | ||
|
||
import ( | ||
"math" | ||
"math/rand" | ||
"time" | ||
) | ||
|
||
// Backoff is an interface to calculate the wait times between attemts of doing | ||
// a task. The first attempt must always return 0s. The Duration function | ||
// can be used together with the [Wait] function for a cancelable backoff sleep. | ||
type Backoff interface { | ||
Duration(int) time.Duration | ||
} | ||
|
||
// StaticBackoff always returns the same sleep duration to any but the 0th | ||
// attempt. | ||
type StaticBackoff struct { | ||
Sleep time.Duration | ||
} | ||
|
||
func (b StaticBackoff) Duration(attempt int) time.Duration { | ||
if attempt == 0 { | ||
return 0 | ||
} | ||
return b.Sleep | ||
} | ||
|
||
// ExponentialBackoff is a typical exponentail backoff with Jitter, based on | ||
// this blog post: | ||
// https://aws.amazon.com/ru/blogs/architecture/exponential-backoff-and-jitter/ | ||
type ExponentialBackoff struct { | ||
Initial time.Duration | ||
Max time.Duration | ||
JitterProportion float64 | ||
} | ||
|
||
func (b ExponentialBackoff) Duration(attempt int) time.Duration { | ||
if attempt == 0 { | ||
return time.Duration(0) | ||
} | ||
|
||
var ( | ||
maxWait = math.Pow(2., float64(attempt-1)) | ||
minWait = maxWait * (1. - b.JitterProportion) | ||
jitter = maxWait * b.JitterProportion * rand.Float64() | ||
totalWait = minWait + jitter | ||
) | ||
|
||
// Note: We must do the min() before muliplying with b.Initial, because it | ||
// is a time.Duration with nano second resolution and we might hit a number | ||
// overflow quite fast which results in not wait time at all. | ||
totalWait = min(totalWait, float64(b.Max)/float64(b.Initial)) | ||
|
||
return time.Duration(float64(b.Initial) * totalWait) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package runutil | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestBackoffTypes(t *testing.T) { | ||
assert.Implements(t, new(Backoff), ExponentialBackoff{}) | ||
assert.Implements(t, new(Backoff), StaticBackoff{}) | ||
} | ||
|
||
func TestStaticBackoff(t *testing.T) { | ||
bo := StaticBackoff{Sleep: 10 * time.Millisecond} | ||
assert.Equal(t, time.Duration(0), bo.Duration(0)) | ||
for i := 1; i < 10; i++ { | ||
assert.Equal(t, 10*time.Millisecond, bo.Duration(i)) | ||
} | ||
} | ||
|
||
func TestExponentialBackoffWithoutJitter(t *testing.T) { | ||
cases := []struct { | ||
bo ExponentialBackoff | ||
want []int | ||
}{ | ||
{ | ||
bo: ExponentialBackoff{Initial: time.Second, Max: time.Minute}, | ||
want: []int{0, 1, 2, 4, 8, 16, 32, 60, 60, 60, 60}, | ||
}, | ||
{ | ||
bo: ExponentialBackoff{Initial: 2 * time.Second, Max: time.Minute}, | ||
want: []int{0, 2, 4, 8, 16, 32, 60, 60, 60, 60, 60}, | ||
}, | ||
{ | ||
bo: ExponentialBackoff{Initial: 3 * time.Second, Max: time.Minute}, | ||
want: []int{0, 3, 6, 12, 24, 48, 60, 60, 60}, | ||
}, | ||
} | ||
|
||
for _, tc := range cases { | ||
name := fmt.Sprintf("i=%v,m=%v", tc.bo.Initial, tc.bo.Max) | ||
t.Run(name, func(t *testing.T) { | ||
require.Equal(t, 0., tc.bo.JitterProportion, | ||
"jitter contains randomness and cannot be tested here") | ||
for attempt, expected := range tc.want { | ||
want := time.Duration(expected) * time.Second | ||
have := tc.bo.Duration(attempt) | ||
assert.Equal(t, want, have) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestExponentialBackoffWithJitter(t *testing.T) { | ||
cases := []struct { | ||
bo ExponentialBackoff | ||
min []int | ||
max []int | ||
}{ | ||
{ | ||
// This is just a sanitiy check for the test itself. | ||
bo: ExponentialBackoff{Initial: 2 * time.Second, Max: time.Minute}, | ||
min: []int{0, 2, 4, 8, 16, 32, 60, 60, 60, 60, 60}, | ||
max: []int{0, 2, 4, 8, 16, 32, 60, 60, 60, 60, 60}, | ||
}, | ||
{ | ||
bo: ExponentialBackoff{Initial: 2 * time.Second, Max: time.Minute, JitterProportion: 0.5}, | ||
min: []int{0, 1, 2, 4, 8, 16, 30, 30, 30, 30, 30}, | ||
max: []int{0, 2, 4, 8, 16, 32, 60, 60, 60, 60, 60}, | ||
}, | ||
} | ||
|
||
for _, tc := range cases { | ||
name := fmt.Sprintf("i=%v,m=%v,j=%v", tc.bo.Initial, tc.bo.Max, tc.bo.JitterProportion) | ||
t.Run(name, func(t *testing.T) { | ||
for attempt := range tc.min { | ||
wantMin := time.Duration(tc.min[attempt]) * time.Second | ||
wantMax := time.Duration(tc.max[attempt]) * time.Second | ||
have := tc.bo.Duration(attempt) | ||
|
||
assert.GreaterOrEqual(t, have, wantMin, "attempt #%d", attempt) | ||
assert.LessOrEqual(t, have, wantMax, "attempt #%d", attempt) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestExponentialBackoffWithHighAttempts(t *testing.T) { | ||
bo := ExponentialBackoff{ | ||
Initial: time.Minute, | ||
Max: 5 * time.Minute, | ||
JitterProportion: 0.5, | ||
} | ||
|
||
cases := []int{1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8} | ||
|
||
for _, attempt := range cases { | ||
t.Run(fmt.Sprint(attempt), func(t *testing.T) { | ||
duration := bo.Duration(attempt) | ||
assert.Greater(t, duration, time.Duration(0)) | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package runutil | ||
|
||
import ( | ||
"context" | ||
) | ||
|
||
// DeclarativeWorker is an alternative to building the worker behaviour with | ||
// chained functions.If automatically chains worker functions based on defined | ||
// field in the most sensful order. | ||
// | ||
// It satisfies the Worker interface for easier use. | ||
type DeclarativeWorker struct { | ||
Name string | ||
Worker Worker | ||
Retry Backoff | ||
} | ||
|
||
func (w DeclarativeWorker) Run(ctx context.Context) error { | ||
worker := w.Worker | ||
|
||
if w.Name != "" { | ||
worker = NamedWorker(worker, w.Name) | ||
} | ||
|
||
if w.Retry != nil { | ||
worker = Retry(worker, w.Retry) | ||
} | ||
|
||
return worker.Run(ctx) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
package runutil | ||
|
||
import ( | ||
"context" | ||
|
||
"go.uber.org/dig" | ||
) | ||
|
||
// WorkerConfiger is for Workers that configure themselfes. This means they can define repeats, backoff and jitter themselves. | ||
// | ||
// func (w *CommitFetcher) Workers() []runutil.Worker { | ||
// return []runutil.Worker{ | ||
// runutil.DeclarativeWorker{ | ||
// Name: "Commits", | ||
// Worker: runutil.Repeat(5*time.Second, runutil.JobFunc(w.fetchCommits)), | ||
// Retry: runutil.ExponentialBackoff{ | ||
// Initial: time.Second, | ||
// Max: time.Minute, | ||
// JitterProportion: 0.5, | ||
// }, | ||
// }, | ||
// runutil.DeclarativeWorker{ | ||
// Name: "PRs", | ||
// Worker: runutil.Repeat(5*time.Second, runutil.JobFunc(w.fetchPRs)), | ||
// Retry: runutil.ExponentialBackoff{ | ||
// Initial: time.Second, | ||
// Max: time.Minute, | ||
// JitterProportion: 0.5, | ||
// }, | ||
// }, | ||
// } | ||
// } | ||
type WorkerConfiger interface { | ||
Workers() []Worker | ||
} | ||
|
||
// WorkerGroup is a input parameter struct for Dig to retrieve all instances | ||
// that implement the WorkerConfigerer. | ||
type WorkerGroup struct { | ||
dig.In | ||
All []WorkerConfiger `group:"worker"` | ||
} | ||
|
||
// ProvideWorker injects a WorkerConfiger, which can later be started with | ||
// RunProvidedWorkers. | ||
func ProvideWorker(c *dig.Container, fn any) error { | ||
return c.Provide(fn, dig.Group("worker"), dig.As(new(WorkerConfiger))) | ||
} | ||
|
||
// RunProvidedWorkers starts all workers there were injected using | ||
// RunAllWorkers. | ||
func RunProvidedWorkers(ctx context.Context, c *dig.Container) error { | ||
return c.Invoke(func(in WorkerGroup) error { | ||
workers := []Worker{} | ||
for _, c := range in.All { | ||
for _, w := range c.Workers() { | ||
workers = append(workers, | ||
NamedWorkerFromType(w, c), | ||
) | ||
} | ||
} | ||
return RunAllWorkers(ctx, workers...) | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package runutil | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/rebuy-de/rebuy-go-sdk/v8/pkg/logutil" | ||
) | ||
|
||
// NamedWorker assigns a new logutil subsystem on startup. See logutil.Start. | ||
func NamedWorker(worker Worker, name string, a ...any) Worker { | ||
return WorkerFunc(func(ctx context.Context) error { | ||
ctx = logutil.Start(ctx, fmt.Sprintf(name, a...)) | ||
return worker.Run(ctx) | ||
}) | ||
} | ||
|
||
// NamedWorkerFromType assigns a new logutil subsystem on startup based on the | ||
// provided type name. See logutil.Start. | ||
func NamedWorkerFromType(worker Worker, t any) Worker { | ||
name := fmt.Sprintf("%T", t) | ||
name = strings.Trim(name, "*") | ||
name = strings.Replace(name, ".", "/", 1) | ||
return NamedWorker(worker, name) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package runutil | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/rebuy-de/rebuy-go-sdk/v8/pkg/logutil" | ||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" | ||
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" | ||
) | ||
|
||
type jobWorker struct { | ||
wait time.Duration | ||
job Job | ||
startImmediately bool | ||
} | ||
|
||
// Repeat reruns a job indefinitely until the context gets cancelled. The job | ||
// will run at most once in the given time interval. This means the wait | ||
// duration is not the sleep between executions, but the time between the start | ||
// of runs (based on [time.Ticker]). | ||
func Repeat(wait time.Duration, job Job, opts ...RepeatOption) Worker { | ||
w := &jobWorker{ | ||
wait: wait, | ||
job: job, | ||
} | ||
|
||
for _, o := range opts { | ||
o(w) | ||
} | ||
|
||
return w | ||
} | ||
|
||
type RepeatOption func(*jobWorker) | ||
|
||
func WithStartImmediately() RepeatOption { | ||
return func(w *jobWorker) { | ||
w.startImmediately = true | ||
} | ||
} | ||
|
||
func (w jobWorker) Run(ctx context.Context) error { | ||
if w.startImmediately { | ||
err := w.runOnce(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
ticker := time.NewTicker(w.wait) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return nil | ||
case <-ticker.C: | ||
err := w.runOnce(ctx) | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (w jobWorker) runOnce(ctx context.Context) error { | ||
span, ctx := tracer.StartSpanFromContext( | ||
ctx, "runutil.job", | ||
tracer.Tag(ext.SpanKind, ext.SpanKindInternal), | ||
tracer.Tag(ext.ResourceName, logutil.GetSubsystem(ctx)), | ||
) | ||
err := w.job.RunOnce(ctx) | ||
if err != nil { | ||
span.Finish(tracer.WithError(err)) | ||
return err | ||
} else { | ||
span.Finish() | ||
} | ||
|
||
return nil | ||
} |
Oops, something went wrong.