Skip to content

Commit

Permalink
Backport5.4/refactor retry event at startup (#2571)
Browse files Browse the repository at this point in the history
* Fix/2402/retry events synchronously at startup (#2414)

* notify subscribers synchronously on startup
* fix race condition in test, jitter now equals initialDelay. Reduced max retry count to 20 (eliminates overflow)
* remove initial delay from retry
* no retry when attempts <= 0 or >= max

* ignore jsonld.ContextURLNotAllowedErr errors at startup (#2570)

---------

Co-authored-by: Wout Slakhorst <[email protected]>
  • Loading branch information
gerardsn and woutslakhorst authored Oct 27, 2023
1 parent f14cbf7 commit 1f36f80
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 25 deletions.
41 changes: 34 additions & 7 deletions network/dag/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/nuts-foundation/nuts-node/jsonld"
"strings"
"time"

"github.com/avast/retry-go/v4"
Expand All @@ -37,7 +39,7 @@ import (
const (
defaultRetryDelay = time.Second
retriesFailedThreshold = 10
maxRetries = 100
maxRetries = 20
// TransactionEventType is used as Type in an Event when a transaction is added to the DAG.
TransactionEventType = "transaction"
// PayloadEventType is used as Type in an Event when a payload is written to the DB.
Expand Down Expand Up @@ -239,16 +241,38 @@ func (p *notifier) Run() error {
if !p.isPersistent() {
return nil
}
return p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error {
// we're going to retry all events synchronously at startup. For the ones that fail we'll start the retry loop
failedAtStartup := make([]Event, 0)
err := p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error {
return reader.Iterate(func(k stoabs.Key, v []byte) error {
event := Event{}
_ = json.Unmarshal(v, &event)

p.retry(event)
// Do not retry events that previously failed on an unknown context. See https://github.com/nuts-foundation/nuts-node/issues/2569
if strings.HasSuffix(event.Error, jsonld.ContextURLNotAllowedErr.Error()) {
return nil
}

if err := p.notifyNow(event); err != nil {
if event.Retries < maxRetries {
failedAtStartup = append(failedAtStartup, event)
}
}

return nil
}, stoabs.BytesKey{})
})
if err != nil {
return err
}

// for all events from failedAtStartup, call retry
// this may still produce errors in the logs or even duplicate errors since notifyNow also failed
// but rather duplicate errors then errors produced from overloading the DB with transactions
for _, event := range failedAtStartup {
p.retry(event)
}
return nil
}

func (p *notifier) GetFailedEvents() (events []Event, err error) {
Expand Down Expand Up @@ -330,21 +354,24 @@ func (p *notifier) Notify(event Event) {
func (p *notifier) retry(event Event) {
delay := p.retryDelay
initialCount := event.Retries + 1
attempts := maxRetries - uint(initialCount)
if attempts <= 0 || attempts >= maxRetries {
return
}

for i := 0; i < initialCount; i++ {
delay *= 2
}

go func(ctx context.Context) {
// also an initial delay
time.Sleep(delay)
err := retry.Do(func() error {
return p.notifyNow(event)
},
retry.Attempts(maxRetries-uint(initialCount)),
retry.Attempts(attempts),
retry.MaxDelay(24*time.Hour),
retry.MaxJitter(p.retryDelay),
retry.Delay(delay),
retry.DelayType(retry.BackOffDelay),
retry.DelayType(retry.CombineDelay(retry.BackOffDelay, retry.RandomDelay)),
retry.Context(ctx),
retry.LastErrorOnly(true),
retry.OnRetry(func(n uint, err error) {
Expand Down
90 changes: 72 additions & 18 deletions network/dag/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/nuts-foundation/go-stoabs"
"github.com/nuts-foundation/go-stoabs/bbolt"
"github.com/nuts-foundation/nuts-node/crypto/hash"
"github.com/nuts-foundation/nuts-node/jsonld"
"github.com/nuts-foundation/nuts-node/storage"
"github.com/nuts-foundation/nuts-node/test"
"github.com/nuts-foundation/nuts-node/test/io"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -34,6 +36,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"path"
"runtime"
"strings"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -313,13 +317,14 @@ func TestNotifier_Notify(t *testing.T) {
timeFunc = func() time.Time {
return now
}
s := NewNotifier(t.Name(), counter.callback, WithPersistency(kvStore)).(*notifier)
s := NewNotifier(t.Name(), counter.callback, WithRetryDelay(2*time.Second), WithPersistency(kvStore)).(*notifier)
defer s.Close()
kvStore.Write(ctx, func(tx stoabs.WriteTx) error {
return s.Save(tx, event)
})

s.Notify(event)
// we use retry here since Notify will run notifyNow twice asynchronously
s.retry(event)

test.WaitFor(t, func() (bool, error) {
return counter.N.Load() == 1, nil
Expand Down Expand Up @@ -360,11 +365,7 @@ func TestNotifier_Notify(t *testing.T) {
}

func TestNotifier_Run(t *testing.T) {
ctx := context.Background()
filePath := io.TestDirectory(t)
transaction, _, _ := CreateTestTransaction(0)
kvStore, _ := bbolt.CreateBBoltStore(path.Join(filePath, "test.db"))
counter := callbackCounter{}
payload := "payload"
event := Event{
Type: TransactionEventType,
Expand All @@ -373,18 +374,71 @@ func TestNotifier_Run(t *testing.T) {
Transaction: transaction,
Payload: []byte(payload),
}
s := NewNotifier(t.Name(), counter.callbackFinished, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier)

_ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error {
bytes, _ := json.Marshal(event)
return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes)
t.Run("OK - callback called", func(t *testing.T) {
ctx := context.Background()
filePath := io.TestDirectory(t)
kvStore := storage.CreateTestBBoltStore(t, path.Join(filePath, "test.db"))
counter := callbackCounter{}
s := NewNotifier(t.Name(), counter.callbackFinished, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier)
defer s.Close()

_ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error {
bytes, _ := json.Marshal(event)
return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes)
})

err := s.Run()
require.NoError(t, err)

test.WaitFor(t, func() (bool, error) {
return counter.N.Load() == 1, nil
}, time.Second, "timeout while waiting for receiver")
})

s.Run()
t.Run("OK - callback errors", func(t *testing.T) {
ctx := context.Background()
filePath := io.TestDirectory(t)
kvStore := storage.CreateTestBBoltStore(t, path.Join(filePath, "test.db"))
counter := callbackCounter{}
counter.setCallbackError(errors.New("error"))
s := NewNotifier(t.Name(), counter.callbackFailure, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier)
defer s.Close()

test.WaitFor(t, func() (bool, error) {
return counter.N.Load() == 1, nil
}, time.Second, "timeout while waiting for receiver")
_ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error {
bytes, _ := json.Marshal(event)
return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes)
})

err := s.Run()
require.NoError(t, err)

stack := make([]byte, 4*1024)
test.WaitFor(t, func() (bool, error) {
runtime.Stack(stack, true)
index := strings.Index(string(stack), "dag.(*notifier).retry.func1")
return index != -1, nil
}, time.Second, "timeout while waiting for go routine to start")
})

t.Run("OK - does not retry unknown context errors", func(t *testing.T) {
ctx := context.Background()
filePath := io.TestDirectory(t)
kvStore := storage.CreateTestBBoltStore(t, path.Join(filePath, "test.db"))
counter := callbackCounter{}
s := NewNotifier(t.Name(), counter.callbackFinished, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier)
defer s.Close()

event := Event{Error: "some error: " + jsonld.ContextURLNotAllowedErr.Error()}
_ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error {
bytes, _ := json.Marshal(event)
return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes)
})

err := s.Run()
require.NoError(t, err)
assert.Equal(t, int64(0), counter.N.Load())
})
}

func TestNotifier_VariousFlows(t *testing.T) {
Expand Down Expand Up @@ -447,7 +501,7 @@ func TestNotifier_VariousFlows(t *testing.T) {
kvStore, _ := bbolt.CreateBBoltStore(path.Join(filePath, "test.db"))
counter := callbackCounter{}
notifiedCounter := &prometheusCounter{}
event := Event{Hash: hash.EmptyHash(), Transaction: transaction, Retries: 95}
event := Event{Hash: hash.EmptyHash(), Transaction: transaction, Retries: maxRetries - 5}
s := NewNotifier(t.Name(), counter.callbackFailure, WithPersistency(kvStore), WithRetryDelay(time.Nanosecond), withCounters(notifiedCounter, nil)).(*notifier)
defer s.Close()

Expand All @@ -464,8 +518,8 @@ func TestNotifier_VariousFlows(t *testing.T) {
return nil
})

return e.Retries == 100, nil
}, time.Second, "timeout while waiting for receiver")
return e.Retries == maxRetries, nil
}, 2*time.Second, "timeout while waiting for receiver")

events, err := s.GetFailedEvents()

Expand Down Expand Up @@ -532,7 +586,7 @@ func TestNotifier_VariousFlows(t *testing.T) {
return nil
})
return e.Retries >= maxRetries, nil
}, time.Second, "timeout while waiting for receiver")
}, 5*time.Second, "timeout while waiting for receiver")

events, err := s.GetFailedEvents()

Expand Down

0 comments on commit 1f36f80

Please sign in to comment.