Skip to content

Commit

Permalink
Bigger but more gradual backoff with some randomization
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Nov 1, 2023
1 parent dfee990 commit ae41721
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 15 deletions.
61 changes: 46 additions & 15 deletions service/adapters/sqlite/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,26 @@ import (
"context"
"database/sql"
"math"
"math/rand"
"sync"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/nos-crossposting-service/internal/logging"
)

type BackoffManager interface {
// GetMessageErrorBackoff is used to backoff reprocessing of a single
// specific message if its processing fails. The first time message
// processing fails 1 is passed to this function, the second time 2 etc.
GetMessageErrorBackoff(nackCount int) time.Duration

// GetNoMessagesBackoff is used to backoff querying for new messages on the
// queue. The first time in a row where the query returns no messages 1 is
// passed to this function, then 2 is passed etc.
GetNoMessagesBackoff(tick int) time.Duration
}

type Message struct {
uuid string
payload []byte
Expand Down Expand Up @@ -86,12 +99,17 @@ var (
)

type PubSub struct {
db *sql.DB
logger logging.Logger
backoffManager BackoffManager
db *sql.DB
logger logging.Logger
}

func NewPubSub(db *sql.DB, logger logging.Logger) *PubSub {
return &PubSub{db: db, logger: logger}
return &PubSub{
backoffManager: NewDefaultBackoffManager(),
db: db,
logger: logger,
}
}

func (p *PubSub) InitializingQueries() []string {
Expand Down Expand Up @@ -156,7 +174,7 @@ func (p *PubSub) subscribe(ctx context.Context, topic string, ch chan *ReceivedM
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
noMessagesCounter++
backoff := getNoMessagesBackoff(noMessagesCounter)
backoff := p.backoffManager.GetNoMessagesBackoff(noMessagesCounter)

p.logger.Trace().
WithField("duration", backoff).
Expand Down Expand Up @@ -235,7 +253,7 @@ func (p *PubSub) nack(msg Message) error {
}

nackCount = nackCount + 1
backoffDuration := getMessageErrorBackoff(nackCount)
backoffDuration := p.backoffManager.GetMessageErrorBackoff(nackCount)
backoffUntil := time.Now().Add(backoffDuration)

p.logger.Trace().
Expand All @@ -255,18 +273,31 @@ func (p *PubSub) nack(msg Message) error {
return nil
}

func getMessageErrorBackoff(nackCount int) time.Duration {
a := time.Duration(math.Pow(5, float64(nackCount-1))) * time.Second
b := 1 * time.Hour
return min(a, b)
type executor interface {
Exec(query string, args ...any) (sql.Result, error)
}

func getNoMessagesBackoff(tick int) time.Duration {
a := time.Duration(math.Pow(2, float64(tick-1))) * time.Second
b := 30 * time.Second
return min(a, b)
const (
maxDefaultMessageErrorBackoff = 6 * time.Hour
maxDefaultNoMessagesBackoff = 30 * time.Second

randomizeMessageErrorBackoffByFraction = 0.1
)

type DefaultBackoffManager struct {
}

type executor interface {
Exec(query string, args ...any) (sql.Result, error)
func NewDefaultBackoffManager() DefaultBackoffManager {
return DefaultBackoffManager{}
}

func (d DefaultBackoffManager) GetMessageErrorBackoff(nackCount int) time.Duration {
a := time.Duration(math.Pow(4, float64(nackCount-1))) * time.Second
randFraction := 1 - randomizeMessageErrorBackoffByFraction + 2*randomizeMessageErrorBackoffByFraction*rand.Float64()
return time.Duration(float64(min(a, maxDefaultMessageErrorBackoff)) * randFraction)
}

func (d DefaultBackoffManager) GetNoMessagesBackoff(tick int) time.Duration {
a := time.Duration(math.Pow(2, float64(tick-1))) * time.Second
return min(a, maxDefaultNoMessagesBackoff)
}
22 changes: 22 additions & 0 deletions service/adapters/sqlite/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,25 @@ func TestPubSub_QueueLengthReportsNumberOfElementsInQueue(t *testing.T) {
assert.Equal(t, 1, n)
}, 10*time.Second, 100*time.Millisecond)
}

func TestDefaultBackoffManager_GetMessageErrorBackoffStatisticallyFallsWithinCertainEpsilon(t *testing.T) {
const numSamples = 1000

m := sqlite.NewDefaultBackoffManager()
for i := 1; i < 10; i++ {
var sum float64
var avg float64

for samples := 0; samples < numSamples; samples++ {
backoff := m.GetMessageErrorBackoff(i)
if samples > numSamples/2 {
require.InEpsilonf(t, avg, backoff, 0.15, "failed for i=%d and sample=%d", i, samples)
}

sum += float64(backoff)
avg = sum / float64(samples)
}

t.Log(i, time.Duration(avg))
}
}

0 comments on commit ae41721

Please sign in to comment.