Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport5.4/refactor retry event at startup #2571

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions network/dag/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/nuts-foundation/nuts-node/jsonld"
"strings"
"time"

"github.com/avast/retry-go/v4"
Expand All @@ -37,7 +39,7 @@ import (
const (
defaultRetryDelay = time.Second
retriesFailedThreshold = 10
maxRetries = 100
maxRetries = 20
// TransactionEventType is used as Type in an Event when a transaction is added to the DAG.
TransactionEventType = "transaction"
// PayloadEventType is used as Type in an Event when a payload is written to the DB.
Expand Down Expand Up @@ -239,16 +241,38 @@ func (p *notifier) Run() error {
if !p.isPersistent() {
return nil
}
return p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error {
// we're going to retry all events synchronously at startup. For the ones that fail we'll start the retry loop
failedAtStartup := make([]Event, 0)
err := p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error {
return reader.Iterate(func(k stoabs.Key, v []byte) error {
event := Event{}
_ = json.Unmarshal(v, &event)

p.retry(event)
// Do not retry events that previously failed on an unknown context. See https://github.com/nuts-foundation/nuts-node/issues/2569
if strings.HasSuffix(event.Error, jsonld.ContextURLNotAllowedErr.Error()) {
return nil
}

if err := p.notifyNow(event); err != nil {
if event.Retries < maxRetries {
failedAtStartup = append(failedAtStartup, event)
}
}

return nil
}, stoabs.BytesKey{})
})
if err != nil {
return err
}

// for all events from failedAtStartup, call retry
// this may still produce errors in the logs or even duplicate errors since notifyNow also failed
// but rather duplicate errors then errors produced from overloading the DB with transactions
for _, event := range failedAtStartup {
p.retry(event)
}
return nil
}

func (p *notifier) GetFailedEvents() (events []Event, err error) {
Expand Down Expand Up @@ -330,21 +354,24 @@ func (p *notifier) Notify(event Event) {
func (p *notifier) retry(event Event) {
delay := p.retryDelay
initialCount := event.Retries + 1
attempts := maxRetries - uint(initialCount)
if attempts <= 0 || attempts >= maxRetries {
return
}

for i := 0; i < initialCount; i++ {
delay *= 2
}

go func(ctx context.Context) {
// also an initial delay
time.Sleep(delay)
err := retry.Do(func() error {
return p.notifyNow(event)
},
retry.Attempts(maxRetries-uint(initialCount)),
retry.Attempts(attempts),
retry.MaxDelay(24*time.Hour),
retry.MaxJitter(p.retryDelay),
retry.Delay(delay),
retry.DelayType(retry.BackOffDelay),
retry.DelayType(retry.CombineDelay(retry.BackOffDelay, retry.RandomDelay)),
retry.Context(ctx),
retry.LastErrorOnly(true),
retry.OnRetry(func(n uint, err error) {
Expand Down
90 changes: 72 additions & 18 deletions network/dag/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/nuts-foundation/go-stoabs"
"github.com/nuts-foundation/go-stoabs/bbolt"
"github.com/nuts-foundation/nuts-node/crypto/hash"
"github.com/nuts-foundation/nuts-node/jsonld"
"github.com/nuts-foundation/nuts-node/storage"
"github.com/nuts-foundation/nuts-node/test"
"github.com/nuts-foundation/nuts-node/test/io"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -34,6 +36,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"path"
"runtime"
"strings"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -313,13 +317,14 @@ func TestNotifier_Notify(t *testing.T) {
timeFunc = func() time.Time {
return now
}
s := NewNotifier(t.Name(), counter.callback, WithPersistency(kvStore)).(*notifier)
s := NewNotifier(t.Name(), counter.callback, WithRetryDelay(2*time.Second), WithPersistency(kvStore)).(*notifier)
defer s.Close()
kvStore.Write(ctx, func(tx stoabs.WriteTx) error {
return s.Save(tx, event)
})

s.Notify(event)
// we use retry here since Notify will run notifyNow twice asynchronously
s.retry(event)

test.WaitFor(t, func() (bool, error) {
return counter.N.Load() == 1, nil
Expand Down Expand Up @@ -360,11 +365,7 @@ func TestNotifier_Notify(t *testing.T) {
}

func TestNotifier_Run(t *testing.T) {
ctx := context.Background()
filePath := io.TestDirectory(t)
transaction, _, _ := CreateTestTransaction(0)
kvStore, _ := bbolt.CreateBBoltStore(path.Join(filePath, "test.db"))
counter := callbackCounter{}
payload := "payload"
event := Event{
Type: TransactionEventType,
Expand All @@ -373,18 +374,71 @@ func TestNotifier_Run(t *testing.T) {
Transaction: transaction,
Payload: []byte(payload),
}
s := NewNotifier(t.Name(), counter.callbackFinished, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier)

_ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error {
bytes, _ := json.Marshal(event)
return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes)
t.Run("OK - callback called", func(t *testing.T) {
ctx := context.Background()
filePath := io.TestDirectory(t)
kvStore := storage.CreateTestBBoltStore(t, path.Join(filePath, "test.db"))
counter := callbackCounter{}
s := NewNotifier(t.Name(), counter.callbackFinished, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier)
defer s.Close()

_ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error {
bytes, _ := json.Marshal(event)
return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes)
})

err := s.Run()
require.NoError(t, err)

test.WaitFor(t, func() (bool, error) {
return counter.N.Load() == 1, nil
}, time.Second, "timeout while waiting for receiver")
})

s.Run()
t.Run("OK - callback errors", func(t *testing.T) {
ctx := context.Background()
filePath := io.TestDirectory(t)
kvStore := storage.CreateTestBBoltStore(t, path.Join(filePath, "test.db"))
counter := callbackCounter{}
counter.setCallbackError(errors.New("error"))
s := NewNotifier(t.Name(), counter.callbackFailure, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier)
defer s.Close()

test.WaitFor(t, func() (bool, error) {
return counter.N.Load() == 1, nil
}, time.Second, "timeout while waiting for receiver")
_ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error {
bytes, _ := json.Marshal(event)
return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes)
})

err := s.Run()
require.NoError(t, err)

stack := make([]byte, 4*1024)
test.WaitFor(t, func() (bool, error) {
runtime.Stack(stack, true)
index := strings.Index(string(stack), "dag.(*notifier).retry.func1")
return index != -1, nil
}, time.Second, "timeout while waiting for go routine to start")
})

t.Run("OK - does not retry unknown context errors", func(t *testing.T) {
ctx := context.Background()
filePath := io.TestDirectory(t)
kvStore := storage.CreateTestBBoltStore(t, path.Join(filePath, "test.db"))
counter := callbackCounter{}
s := NewNotifier(t.Name(), counter.callbackFinished, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier)
defer s.Close()

event := Event{Error: "some error: " + jsonld.ContextURLNotAllowedErr.Error()}
_ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error {
bytes, _ := json.Marshal(event)
return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes)
})

err := s.Run()
require.NoError(t, err)
assert.Equal(t, int64(0), counter.N.Load())
})
}

func TestNotifier_VariousFlows(t *testing.T) {
Expand Down Expand Up @@ -447,7 +501,7 @@ func TestNotifier_VariousFlows(t *testing.T) {
kvStore, _ := bbolt.CreateBBoltStore(path.Join(filePath, "test.db"))
counter := callbackCounter{}
notifiedCounter := &prometheusCounter{}
event := Event{Hash: hash.EmptyHash(), Transaction: transaction, Retries: 95}
event := Event{Hash: hash.EmptyHash(), Transaction: transaction, Retries: maxRetries - 5}
s := NewNotifier(t.Name(), counter.callbackFailure, WithPersistency(kvStore), WithRetryDelay(time.Nanosecond), withCounters(notifiedCounter, nil)).(*notifier)
defer s.Close()

Expand All @@ -464,8 +518,8 @@ func TestNotifier_VariousFlows(t *testing.T) {
return nil
})

return e.Retries == 100, nil
}, time.Second, "timeout while waiting for receiver")
return e.Retries == maxRetries, nil
}, 2*time.Second, "timeout while waiting for receiver")

events, err := s.GetFailedEvents()

Expand Down Expand Up @@ -532,7 +586,7 @@ func TestNotifier_VariousFlows(t *testing.T) {
return nil
})
return e.Retries >= maxRetries, nil
}, time.Second, "timeout while waiting for receiver")
}, 5*time.Second, "timeout while waiting for receiver")

events, err := s.GetFailedEvents()

Expand Down