From cc8524c8b5faf8f45d1de9a76f77f6a3c508059b Mon Sep 17 00:00:00 2001 From: boreq Date: Wed, 1 Nov 2023 11:31:54 +0900 Subject: [PATCH] Bigger but more gradual backoff with some randomization --- service/adapters/sqlite/pubsub.go | 61 +++++++++++++++++++------- service/adapters/sqlite/pubsub_test.go | 22 ++++++++++ 2 files changed, 68 insertions(+), 15 deletions(-) diff --git a/service/adapters/sqlite/pubsub.go b/service/adapters/sqlite/pubsub.go index efa1adb..59adaa9 100644 --- a/service/adapters/sqlite/pubsub.go +++ b/service/adapters/sqlite/pubsub.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "math" + "math/rand" "sync" "time" @@ -11,6 +12,18 @@ import ( "github.com/planetary-social/nos-crossposting-service/internal/logging" ) +type BackoffManager interface { + // GetMessageErrorBackoff is used to backoff reprocessing of a single + // specific message if its processing fails. The first time message + // processing fails 1 is passed to this function, the second time 2 etc. + GetMessageErrorBackoff(nackCount int) time.Duration + + // GetNoMessagesBackoff is used to backoff querying for new messages on the + // queue. The first time in a row where the query returns no messages 1 is + // passed to this function, then 2 is passed etc. + GetNoMessagesBackoff(tick int) time.Duration +} + type Message struct { uuid string payload []byte @@ -86,12 +99,17 @@ var ( ) type PubSub struct { - db *sql.DB - logger logging.Logger + backoffManager BackoffManager + db *sql.DB + logger logging.Logger } func NewPubSub(db *sql.DB, logger logging.Logger) *PubSub { - return &PubSub{db: db, logger: logger} + return &PubSub{ + backoffManager: NewDefaultBackoffManager(), + db: db, + logger: logger, + } } func (p *PubSub) InitializingQueries() []string { @@ -156,7 +174,7 @@ func (p *PubSub) subscribe(ctx context.Context, topic string, ch chan *ReceivedM if err != nil { if errors.Is(err, sql.ErrNoRows) { noMessagesCounter++ - backoff := getNoMessagesBackoff(noMessagesCounter) + backoff := p.backoffManager.GetNoMessagesBackoff(noMessagesCounter) p.logger.Trace(). WithField("duration", backoff). @@ -235,7 +253,7 @@ func (p *PubSub) nack(msg Message) error { } nackCount = nackCount + 1 - backoffDuration := getMessageErrorBackoff(nackCount) + backoffDuration := p.backoffManager.GetMessageErrorBackoff(nackCount) backoffUntil := time.Now().Add(backoffDuration) p.logger.Trace(). @@ -255,18 +273,31 @@ func (p *PubSub) nack(msg Message) error { return nil } -func getMessageErrorBackoff(nackCount int) time.Duration { - a := time.Duration(math.Pow(5, float64(nackCount-1))) * time.Second - b := 1 * time.Hour - return min(a, b) +type executor interface { + Exec(query string, args ...any) (sql.Result, error) } -func getNoMessagesBackoff(tick int) time.Duration { - a := time.Duration(math.Pow(2, float64(tick-1))) * time.Second - b := 30 * time.Second - return min(a, b) +const ( + maxDefaultMessageErrorBackoff = 6 * time.Hour + maxDefaultNoMessagesBackoff = 30 * time.Second + + randomizeMessageErrorBackoffByFraction = 0.1 +) + +type DefaultBackoffManager struct { } -type executor interface { - Exec(query string, args ...any) (sql.Result, error) +func NewDefaultBackoffManager() DefaultBackoffManager { + return DefaultBackoffManager{} +} + +func (d DefaultBackoffManager) GetMessageErrorBackoff(nackCount int) time.Duration { + a := time.Duration(math.Pow(4, float64(nackCount-1))) * time.Second + randFraction := 1 - randomizeMessageErrorBackoffByFraction + 2*randomizeMessageErrorBackoffByFraction*rand.Float64() + return time.Duration(float64(min(a, maxDefaultMessageErrorBackoff)) * randFraction) +} + +func (d DefaultBackoffManager) GetNoMessagesBackoff(tick int) time.Duration { + a := time.Duration(math.Pow(2, float64(tick-1))) * time.Second + return min(a, maxDefaultNoMessagesBackoff) } diff --git a/service/adapters/sqlite/pubsub_test.go b/service/adapters/sqlite/pubsub_test.go index d8fb097..a91f0f4 100644 --- a/service/adapters/sqlite/pubsub_test.go +++ b/service/adapters/sqlite/pubsub_test.go @@ -208,3 +208,25 @@ func TestPubSub_QueueLengthReportsNumberOfElementsInQueue(t *testing.T) { assert.Equal(t, 1, n) }, 10*time.Second, 100*time.Millisecond) } + +func TestDefaultBackoffManager_GetMessageErrorBackoffStatisticallyFallsWithinCertainEpsilon(t *testing.T) { + const numSamples = 1000 + + m := sqlite.NewDefaultBackoffManager() + for i := 1; i < 10; i++ { + var sum float64 + var avg float64 + + for samples := 0; samples < numSamples; samples++ { + backoff := m.GetMessageErrorBackoff(i) + if samples > numSamples/2 { + require.InEpsilonf(t, avg, backoff, 0.15, "failed for i=%d and sample=%d", i, samples) + } + + sum += float64(backoff) + avg = sum / float64(samples) + } + + t.Log(i, time.Duration(avg)) + } +}