diff --git a/scheduler/errors/build_errors.go b/scheduler/errors/build_errors.go index 9c80bd6..93ea5ad 100644 --- a/scheduler/errors/build_errors.go +++ b/scheduler/errors/build_errors.go @@ -233,3 +233,58 @@ func NewProcessPanicErrorBuilder() *ProcessPanicErrorBuilder { func NewProcessPanicError() Error { return NewProcessPanicErrorBuilder().Build() } + +// RecoveryDescriptorSection defines a section of errors with the following scope: +// Recovery descriptor errors +var RecoveryDescriptorSection = &impl.ErrorSection{ + Key: "recovery_descriptor", + Title: "Recovery descriptor errors", +} + +// RecoveryDescriptorMaxRetriesReachedCode is the code for an instance of "max_retries_reached". +const RecoveryDescriptorMaxRetriesReachedCode = "hsch_recovery_descriptor_max_retries_reached" + +// IsRecoveryDescriptorMaxRetriesReached tests whether a given error is an instance of "max_retries_reached". +func IsRecoveryDescriptorMaxRetriesReached(err errawr.Error) bool { + return err != nil && err.Is(RecoveryDescriptorMaxRetriesReachedCode) +} + +// IsRecoveryDescriptorMaxRetriesReached tests whether a given error is an instance of "max_retries_reached". +func (External) IsRecoveryDescriptorMaxRetriesReached(err errawr.Error) bool { + return IsRecoveryDescriptorMaxRetriesReached(err) +} + +// RecoveryDescriptorMaxRetriesReachedBuilder is a builder for "max_retries_reached" errors. +type RecoveryDescriptorMaxRetriesReachedBuilder struct { + arguments impl.ErrorArguments +} + +// Build creates the error for the code "max_retries_reached" from this builder. +func (b *RecoveryDescriptorMaxRetriesReachedBuilder) Build() Error { + description := &impl.ErrorDescription{ + Friendly: "The max retries ({{max_retries}} have been reached.", + Technical: "The max retries ({{max_retries}} have been reached.", + } + + return &impl.Error{ + ErrorArguments: b.arguments, + ErrorCode: "max_retries_reached", + ErrorDescription: description, + ErrorDomain: Domain, + ErrorMetadata: &impl.ErrorMetadata{}, + ErrorSection: RecoveryDescriptorSection, + ErrorSensitivity: errawr.ErrorSensitivityNone, + ErrorTitle: "Max retries reached", + Version: 1, + } +} + +// NewRecoveryDescriptorMaxRetriesReachedBuilder creates a new error builder for the code "max_retries_reached". +func NewRecoveryDescriptorMaxRetriesReachedBuilder(maxRetries int64) *RecoveryDescriptorMaxRetriesReachedBuilder { + return &RecoveryDescriptorMaxRetriesReachedBuilder{arguments: impl.ErrorArguments{"max_retries": impl.NewErrorArgument(maxRetries, "the configured max retries")}} +} + +// NewRecoveryDescriptorMaxRetriesReached creates a new error with the code "max_retries_reached". +func NewRecoveryDescriptorMaxRetriesReached(maxRetries int64) Error { + return NewRecoveryDescriptorMaxRetriesReachedBuilder(maxRetries).Build() +} diff --git a/scheduler/errors/errors.yml b/scheduler/errors/errors.yml index 1777478..c1e57c6 100644 --- a/scheduler/errors/errors.yml +++ b/scheduler/errors/errors.yml @@ -31,6 +31,21 @@ sections: type: description: the type of this descriptor + # + # Recovery descriptor + # + recovery_descriptor: + title: Recovery descriptor errors + errors: + max_retries_reached: + title: Max retries reached + description: > + The max retries ({{max_retries}} have been reached. + arguments: + max_retries: + type: integer + description: the configured max retries + # # Process errors # diff --git a/scheduler/recovery.go b/scheduler/recovery.go index 52047b8..ed6bff2 100644 --- a/scheduler/recovery.go +++ b/scheduler/recovery.go @@ -2,64 +2,53 @@ package scheduler import ( "context" - "errors" "reflect" "time" "github.com/puppetlabs/horsehead/v2/netutil" + "github.com/puppetlabs/horsehead/v2/scheduler/errors" ) const ( - defaultBackoffMultiplier = time.Millisecond * 5 - defaultMaxRetries = 10 - defaultResetRetriesTimerDuration = time.Second * 10 + defaultBackoffMultiplier = time.Millisecond * 5 + defaultResetRetriesAfter = time.Second * 10 ) -// ErrMaxRetriesReached is the error returned by RecoveryDescriptor if the max retries -// have been reached. -var ErrMaxRetriesReached = errors.New("RecoveryDescriptor: max retries have been reached") - // RecoveryDescriptorOptions contains fields that allow backoff and retry parameters // to be set. type RecoveryDescriptorOptions struct { // BackoffMultiplier is the timing multiplier between attempts using netutil.Backoff. + // + // Default: 5ms BackoffMultiplier time.Duration // MaxRetries is the max times the RecoveryDescriptor should attempt to run the delegate - // descriptor during a reset retries duration. + // descriptor during a reset retries duration. If this option is <= 0 then it means + // retry inifinitly; however, the backoff multiplier still applies. + // + // Default: 0 MaxRetries int - // ResetRetriesTimerDuration is the time it takes to reset the retry count when running + // ResetRetriesAfter is the time it takes to reset the retry count when running // a delegate descriptor. - ResetRetriesTimerDuration time.Duration + // + // Default: 10s + ResetRetriesAfter time.Duration } // RecoveryDescriptor wraps a given descriptor so that it restarts if the // descriptor itself fails. This is useful for descriptors that work off of // external information (APIs, events, etc.). type RecoveryDescriptor struct { - delegate Descriptor - backoff netutil.Backoff - maxRetries int - currentRetries int - resetDuration time.Duration + delegate Descriptor + backoff netutil.Backoff + maxRetries int + resetAfter time.Duration } var _ Descriptor = &RecoveryDescriptor{} -// runOnce attempts to run the delegate descriptor. It starts a timer that waits for resetRetriesTimerDuration -// that will reset the retry attempt count to 0 if the delegate runs for the duration without an error. This is -// to prevent hours or days from going by, then an error happens again incrementing the count. If this happens 10 -// times, the descriptor will seemingly shutdown after 1 retry, causing confusion. func (rd *RecoveryDescriptor) runOnce(ctx context.Context, pc chan<- Process) (bool, error) { - timer := time.AfterFunc(rd.resetDuration, func() { - rd.currentRetries = 0 - }) - err := rd.delegate.Run(ctx, pc) - // If the timer is already triggered, then this will just return false. So it's - // fine to call blindly here. - timer.Stop() - select { case <-ctx.Done(): return false, err @@ -74,24 +63,32 @@ func (rd *RecoveryDescriptor) runOnce(ctx context.Context, pc chan<- Process) (b } // Run delegates work to another descriptor, catching any errors are restarting -// the descriptor immediately if an error occurs. It never returns an error. It -// only terminates when the context is done. +// the descriptor immediately if an error occurs. It might return a max retries error. +// It only terminates when the context is done or the max retries have been exceeded. func (rd *RecoveryDescriptor) Run(ctx context.Context, pc chan<- Process) error { + var retries int + for { + start := time.Now() + if cont, err := rd.runOnce(ctx, pc); err != nil { return err } else if !cont { break } - if rd.currentRetries == rd.maxRetries { + if time.Now().Sub(start) >= rd.resetAfter { + retries = 0 + } + + if rd.maxRetries > 0 && retries == rd.maxRetries { log(ctx).Error("max retries reached; stopping descriptor", "descriptor", reflect.TypeOf(rd.delegate).String()) - return ErrMaxRetriesReached + return errors.NewRecoveryDescriptorMaxRetriesReached(int64(rd.maxRetries)) } - rd.currentRetries++ + retries++ - if err := rd.backoff.Backoff(ctx, rd.currentRetries); err != nil { + if err := rd.backoff.Backoff(ctx, retries); err != nil { return err } } @@ -113,21 +110,17 @@ func NewRecoveryDescriptorWithOptions(delegate Descriptor, opts RecoveryDescript opts.BackoffMultiplier = defaultBackoffMultiplier } - if opts.MaxRetries == 0 { - opts.MaxRetries = defaultMaxRetries - } - - if opts.ResetRetriesTimerDuration == 0 { - opts.ResetRetriesTimerDuration = defaultResetRetriesTimerDuration + if opts.ResetRetriesAfter == 0 { + opts.ResetRetriesAfter = defaultResetRetriesAfter } // TODO migrate to backoff's NextRun once implemented backoff := &netutil.ExponentialBackoff{Multiplier: opts.BackoffMultiplier} return &RecoveryDescriptor{ - delegate: delegate, - backoff: backoff, - maxRetries: opts.MaxRetries, - resetDuration: opts.ResetRetriesTimerDuration, + delegate: delegate, + backoff: backoff, + maxRetries: opts.MaxRetries, + resetAfter: opts.ResetRetriesAfter, } } diff --git a/scheduler/recovery_test.go b/scheduler/recovery_test.go index fd21b1c..72b0e7f 100644 --- a/scheduler/recovery_test.go +++ b/scheduler/recovery_test.go @@ -29,18 +29,22 @@ func (d *mockErrorDescriptor) Run(ctx context.Context, pc chan<- Process) error func TestRecoverySchedulerStops(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - descriptor := NewRecoveryDescriptor(&mockErrorDescriptor{ + mock := &mockErrorDescriptor{ errCount: 10, // make sure we never succeed successAfterCount: 15, - }) + } + + opts := RecoveryDescriptorOptions{ + MaxRetries: 10, + } + descriptor := NewRecoveryDescriptorWithOptions(mock, opts) pc := make(chan Process) defer cancel() require.Error(t, descriptor.Run(ctx, pc)) - require.Equal(t, 10, descriptor.currentRetries) } type mockRetryResetDescriptor struct { @@ -73,16 +77,16 @@ func TestRecoverySchedulerRetryCountReset(t *testing.T) { successDuration: successDuration, } - descriptor := NewRecoveryDescriptorWithOptions(mock, RecoveryDescriptorOptions{ - ResetRetriesTimerDuration: successDuration - (time.Millisecond * 500), - }) + opts := RecoveryDescriptorOptions{ + MaxRetries: 10, + ResetRetriesAfter: successDuration - (time.Millisecond * 500), + } + descriptor := NewRecoveryDescriptorWithOptions(mock, opts) pc := make(chan Process) defer cancel() require.NoError(t, descriptor.Run(ctx, pc)) - - require.Equal(t, 0, descriptor.currentRetries) require.Equal(t, 0, mock.count) }