Skip to content
This repository has been archived by the owner on Dec 8, 2020. It is now read-only.

Commit

Permalink
Merge pull request #21 from puppetlabs/features/add-backoff-to-recove…
Browse files Browse the repository at this point in the history
…ry-descriptor

Features/add backoff to recovery descriptor
  • Loading branch information
kyleterry authored Jan 17, 2020
2 parents 026ca2d + 9f7d392 commit a75557e
Show file tree
Hide file tree
Showing 19 changed files with 1,603 additions and 337 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
/node_modules/

*.swp
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ require (
github.com/puppetlabs/errawr-gen v1.0.1
github.com/puppetlabs/errawr-go/v2 v2.2.0
github.com/reflect/xparse v0.0.0-20171128034418-ab29bdc5e11c
github.com/stretchr/testify v1.3.0
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.4.0
golang.org/x/text v0.3.0
gopkg.in/yaml.v2 v2.2.7 // indirect
)
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,14 @@ github.com/shurcooL/httpfs v0.0.0-20190527155220-6a4d4a70508b h1:4kg1wyftSKxLtnP
github.com/shurcooL/httpfs v0.0.0-20190527155220-6a4d4a70508b/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd h1:ug7PpSOB5RBPK1Kg6qskGBoP3Vnj/aNYFTznWvlkGo0=
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd/go.mod h1:TrYk7fJVaAttu97ZZKrO9UbRa8izdowaMIZcxYMbVaw=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/xeipuuv/gojsonpointer v0.0.0-20170225233418-6fe8760cad35 h1:0TnXeVP6mx+A4CBf8cQVkQfkhyGBQCmJcT4g6zKzm7M=
github.com/xeipuuv/gojsonpointer v0.0.0-20170225233418-6fe8760cad35/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20150808065054-e02fc20de94c h1:XZWnr3bsDQWAZg4Ne+cPoXRPILrNlPNQfxBuwLl43is=
Expand Down Expand Up @@ -113,3 +118,5 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
55 changes: 55 additions & 0 deletions scheduler/errors/build_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,58 @@ func NewProcessPanicErrorBuilder() *ProcessPanicErrorBuilder {
func NewProcessPanicError() Error {
return NewProcessPanicErrorBuilder().Build()
}

// RecoveryDescriptorSection defines a section of errors with the following scope:
// Recovery descriptor errors
var RecoveryDescriptorSection = &impl.ErrorSection{
Key: "recovery_descriptor",
Title: "Recovery descriptor errors",
}

// RecoveryDescriptorMaxRetriesReachedCode is the code for an instance of "max_retries_reached".
const RecoveryDescriptorMaxRetriesReachedCode = "hsch_recovery_descriptor_max_retries_reached"

// IsRecoveryDescriptorMaxRetriesReached tests whether a given error is an instance of "max_retries_reached".
func IsRecoveryDescriptorMaxRetriesReached(err errawr.Error) bool {
return err != nil && err.Is(RecoveryDescriptorMaxRetriesReachedCode)
}

// IsRecoveryDescriptorMaxRetriesReached tests whether a given error is an instance of "max_retries_reached".
func (External) IsRecoveryDescriptorMaxRetriesReached(err errawr.Error) bool {
return IsRecoveryDescriptorMaxRetriesReached(err)
}

// RecoveryDescriptorMaxRetriesReachedBuilder is a builder for "max_retries_reached" errors.
type RecoveryDescriptorMaxRetriesReachedBuilder struct {
arguments impl.ErrorArguments
}

// Build creates the error for the code "max_retries_reached" from this builder.
func (b *RecoveryDescriptorMaxRetriesReachedBuilder) Build() Error {
description := &impl.ErrorDescription{
Friendly: "The max retries ({{max_retries}} have been reached.",
Technical: "The max retries ({{max_retries}} have been reached.",
}

return &impl.Error{
ErrorArguments: b.arguments,
ErrorCode: "max_retries_reached",
ErrorDescription: description,
ErrorDomain: Domain,
ErrorMetadata: &impl.ErrorMetadata{},
ErrorSection: RecoveryDescriptorSection,
ErrorSensitivity: errawr.ErrorSensitivityNone,
ErrorTitle: "Max retries reached",
Version: 1,
}
}

// NewRecoveryDescriptorMaxRetriesReachedBuilder creates a new error builder for the code "max_retries_reached".
func NewRecoveryDescriptorMaxRetriesReachedBuilder(maxRetries int64) *RecoveryDescriptorMaxRetriesReachedBuilder {
return &RecoveryDescriptorMaxRetriesReachedBuilder{arguments: impl.ErrorArguments{"max_retries": impl.NewErrorArgument(maxRetries, "the configured max retries")}}
}

// NewRecoveryDescriptorMaxRetriesReached creates a new error with the code "max_retries_reached".
func NewRecoveryDescriptorMaxRetriesReached(maxRetries int64) Error {
return NewRecoveryDescriptorMaxRetriesReachedBuilder(maxRetries).Build()
}
15 changes: 15 additions & 0 deletions scheduler/errors/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,21 @@ sections:
type:
description: the type of this descriptor

#
# Recovery descriptor
#
recovery_descriptor:
title: Recovery descriptor errors
errors:
max_retries_reached:
title: Max retries reached
description: >
The max retries ({{max_retries}} have been reached.
arguments:
max_retries:
type: integer
description: the configured max retries

#
# Process errors
#
Expand Down
84 changes: 79 additions & 5 deletions scheduler/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,45 @@ package scheduler
import (
"context"
"reflect"
"time"

"github.com/puppetlabs/horsehead/v2/netutil"
"github.com/puppetlabs/horsehead/v2/scheduler/errors"
)

const (
defaultBackoffMultiplier = time.Millisecond * 5
defaultResetRetriesAfter = time.Second * 10
)

// RecoveryDescriptorOptions contains fields that allow backoff and retry parameters
// to be set.
type RecoveryDescriptorOptions struct {
// BackoffMultiplier is the timing multiplier between attempts using netutil.Backoff.
//
// Default: 5ms
BackoffMultiplier time.Duration
// MaxRetries is the max times the RecoveryDescriptor should attempt to run the delegate
// descriptor during a reset retries duration. If this option is <= 0 then it means
// retry inifinitly; however, the backoff multiplier still applies.
//
// Default: 0
MaxRetries int
// ResetRetriesAfter is the time it takes to reset the retry count when running
// a delegate descriptor.
//
// Default: 10s
ResetRetriesAfter time.Duration
}

// RecoveryDescriptor wraps a given descriptor so that it restarts if the
// descriptor itself fails. This is useful for descriptors that work off of
// external information (APIs, events, etc.).
type RecoveryDescriptor struct {
delegate Descriptor
delegate Descriptor
backoff netutil.Backoff
maxRetries int
resetAfter time.Duration
}

var _ Descriptor = &RecoveryDescriptor{}
Expand All @@ -31,22 +63,64 @@ func (rd *RecoveryDescriptor) runOnce(ctx context.Context, pc chan<- Process) (b
}

// Run delegates work to another descriptor, catching any errors are restarting
// the descriptor immediately if an error occurs. It never returns an error. It
// only terminates when the context is done.
// the descriptor immediately if an error occurs. It might return a max retries error.
// It only terminates when the context is done or the max retries have been exceeded.
func (rd *RecoveryDescriptor) Run(ctx context.Context, pc chan<- Process) error {
var retries int

for {
start := time.Now()

if cont, err := rd.runOnce(ctx, pc); err != nil {
return err
} else if !cont {
break
}

if time.Now().Sub(start) >= rd.resetAfter {
retries = 0
}

if rd.maxRetries > 0 && retries == rd.maxRetries {
log(ctx).Error("max retries reached; stopping descriptor", "descriptor", reflect.TypeOf(rd.delegate).String())
return errors.NewRecoveryDescriptorMaxRetriesReached(int64(rd.maxRetries))
}

retries++

if err := rd.backoff.Backoff(ctx, retries); err != nil {
return err
}
}

return nil
}

// NewRecoveryDescriptor creates a new recovering descriptor wrapping the given
// delegate descriptor.
// delegate descriptor. Default backoff and retry parameters will be used.
func NewRecoveryDescriptor(delegate Descriptor) *RecoveryDescriptor {
return &RecoveryDescriptor{delegate: delegate}
return NewRecoveryDescriptorWithOptions(delegate, RecoveryDescriptorOptions{})
}

// NewRecoveryDescriptorWithOptions creates a new recovering descriptor wrapping the
// given delegate descriptor. It takes RecoveryDescriptorOptions to tune backoff and retry
// parameters.
func NewRecoveryDescriptorWithOptions(delegate Descriptor, opts RecoveryDescriptorOptions) *RecoveryDescriptor {
if opts.BackoffMultiplier == 0 {
opts.BackoffMultiplier = defaultBackoffMultiplier
}

if opts.ResetRetriesAfter == 0 {
opts.ResetRetriesAfter = defaultResetRetriesAfter
}

// TODO migrate to backoff's NextRun once implemented
backoff := &netutil.ExponentialBackoff{Multiplier: opts.BackoffMultiplier}

return &RecoveryDescriptor{
delegate: delegate,
backoff: backoff,
maxRetries: opts.MaxRetries,
resetAfter: opts.ResetRetriesAfter,
}
}
92 changes: 92 additions & 0 deletions scheduler/recovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package scheduler

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type mockErrorDescriptor struct {
errCount int
successAfterCount int
}

func (d *mockErrorDescriptor) Run(ctx context.Context, pc chan<- Process) error {
if d.errCount >= 0 && d.successAfterCount != 0 {
err := fmt.Errorf("err count %d", d.errCount)
d.errCount--
d.successAfterCount--

return err
}

return nil
}

func TestRecoverySchedulerStops(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

mock := &mockErrorDescriptor{
errCount: 10,
// make sure we never succeed
successAfterCount: 15,
}

opts := RecoveryDescriptorOptions{
MaxRetries: 10,
}
descriptor := NewRecoveryDescriptorWithOptions(mock, opts)

pc := make(chan Process)

defer cancel()

require.Error(t, descriptor.Run(ctx, pc))
}

type mockRetryResetDescriptor struct {
count int
successDuration time.Duration
cancel context.CancelFunc
}

func (d *mockRetryResetDescriptor) Run(ctx context.Context, pc chan<- Process) error {
if d.count == 0 {
<-time.After(d.successDuration)
d.cancel()
return nil
}

err := fmt.Errorf("err count %d", d.count)
d.count--

return err
}

func TestRecoverySchedulerRetryCountReset(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

successDuration := time.Second * 1

mock := &mockRetryResetDescriptor{
count: 5,
cancel: cancel,
successDuration: successDuration,
}

opts := RecoveryDescriptorOptions{
MaxRetries: 10,
ResetRetriesAfter: successDuration - (time.Millisecond * 500),
}
descriptor := NewRecoveryDescriptorWithOptions(mock, opts)

pc := make(chan Process)

defer cancel()

require.NoError(t, descriptor.Run(ctx, pc))
require.Equal(t, 0, mock.count)
}
Loading

0 comments on commit a75557e

Please sign in to comment.