diff --git a/network/dag/notifier.go b/network/dag/notifier.go index bce88328cd..675b3882bf 100644 --- a/network/dag/notifier.go +++ b/network/dag/notifier.go @@ -23,6 +23,8 @@ import ( "encoding/json" "errors" "fmt" + "github.com/nuts-foundation/nuts-node/jsonld" + "strings" "time" "github.com/avast/retry-go/v4" @@ -37,7 +39,7 @@ import ( const ( defaultRetryDelay = time.Second retriesFailedThreshold = 10 - maxRetries = 100 + maxRetries = 20 // TransactionEventType is used as Type in an Event when a transaction is added to the DAG. TransactionEventType = "transaction" // PayloadEventType is used as Type in an Event when a payload is written to the DB. @@ -239,16 +241,38 @@ func (p *notifier) Run() error { if !p.isPersistent() { return nil } - return p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error { + // we're going to retry all events synchronously at startup. For the ones that fail we'll start the retry loop + failedAtStartup := make([]Event, 0) + err := p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error { return reader.Iterate(func(k stoabs.Key, v []byte) error { event := Event{} _ = json.Unmarshal(v, &event) - p.retry(event) + // Do not retry events that previously failed on an unknown context. See https://github.com/nuts-foundation/nuts-node/issues/2569 + if strings.HasSuffix(event.Error, jsonld.ContextURLNotAllowedErr.Error()) { + return nil + } + + if err := p.notifyNow(event); err != nil { + if event.Retries < maxRetries { + failedAtStartup = append(failedAtStartup, event) + } + } return nil }, stoabs.BytesKey{}) }) + if err != nil { + return err + } + + // for all events from failedAtStartup, call retry + // this may still produce errors in the logs or even duplicate errors since notifyNow also failed + // but rather duplicate errors then errors produced from overloading the DB with transactions + for _, event := range failedAtStartup { + p.retry(event) + } + return nil } func (p *notifier) GetFailedEvents() (events []Event, err error) { @@ -330,21 +354,24 @@ func (p *notifier) Notify(event Event) { func (p *notifier) retry(event Event) { delay := p.retryDelay initialCount := event.Retries + 1 + attempts := maxRetries - uint(initialCount) + if attempts <= 0 || attempts >= maxRetries { + return + } for i := 0; i < initialCount; i++ { delay *= 2 } go func(ctx context.Context) { - // also an initial delay - time.Sleep(delay) err := retry.Do(func() error { return p.notifyNow(event) }, - retry.Attempts(maxRetries-uint(initialCount)), + retry.Attempts(attempts), retry.MaxDelay(24*time.Hour), + retry.MaxJitter(p.retryDelay), retry.Delay(delay), - retry.DelayType(retry.BackOffDelay), + retry.DelayType(retry.CombineDelay(retry.BackOffDelay, retry.RandomDelay)), retry.Context(ctx), retry.LastErrorOnly(true), retry.OnRetry(func(n uint, err error) { diff --git a/network/dag/notifier_test.go b/network/dag/notifier_test.go index aab56d505c..146d5d904b 100644 --- a/network/dag/notifier_test.go +++ b/network/dag/notifier_test.go @@ -26,6 +26,8 @@ import ( "github.com/nuts-foundation/go-stoabs" "github.com/nuts-foundation/go-stoabs/bbolt" "github.com/nuts-foundation/nuts-node/crypto/hash" + "github.com/nuts-foundation/nuts-node/jsonld" + "github.com/nuts-foundation/nuts-node/storage" "github.com/nuts-foundation/nuts-node/test" "github.com/nuts-foundation/nuts-node/test/io" "github.com/prometheus/client_golang/prometheus" @@ -34,6 +36,8 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "path" + "runtime" + "strings" "sync/atomic" "testing" "time" @@ -313,13 +317,14 @@ func TestNotifier_Notify(t *testing.T) { timeFunc = func() time.Time { return now } - s := NewNotifier(t.Name(), counter.callback, WithPersistency(kvStore)).(*notifier) + s := NewNotifier(t.Name(), counter.callback, WithRetryDelay(2*time.Second), WithPersistency(kvStore)).(*notifier) defer s.Close() kvStore.Write(ctx, func(tx stoabs.WriteTx) error { return s.Save(tx, event) }) - s.Notify(event) + // we use retry here since Notify will run notifyNow twice asynchronously + s.retry(event) test.WaitFor(t, func() (bool, error) { return counter.N.Load() == 1, nil @@ -360,11 +365,7 @@ func TestNotifier_Notify(t *testing.T) { } func TestNotifier_Run(t *testing.T) { - ctx := context.Background() - filePath := io.TestDirectory(t) transaction, _, _ := CreateTestTransaction(0) - kvStore, _ := bbolt.CreateBBoltStore(path.Join(filePath, "test.db")) - counter := callbackCounter{} payload := "payload" event := Event{ Type: TransactionEventType, @@ -373,18 +374,71 @@ func TestNotifier_Run(t *testing.T) { Transaction: transaction, Payload: []byte(payload), } - s := NewNotifier(t.Name(), counter.callbackFinished, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier) - _ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error { - bytes, _ := json.Marshal(event) - return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes) + t.Run("OK - callback called", func(t *testing.T) { + ctx := context.Background() + filePath := io.TestDirectory(t) + kvStore := storage.CreateTestBBoltStore(t, path.Join(filePath, "test.db")) + counter := callbackCounter{} + s := NewNotifier(t.Name(), counter.callbackFinished, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier) + defer s.Close() + + _ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error { + bytes, _ := json.Marshal(event) + return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes) + }) + + err := s.Run() + require.NoError(t, err) + + test.WaitFor(t, func() (bool, error) { + return counter.N.Load() == 1, nil + }, time.Second, "timeout while waiting for receiver") }) - s.Run() + t.Run("OK - callback errors", func(t *testing.T) { + ctx := context.Background() + filePath := io.TestDirectory(t) + kvStore := storage.CreateTestBBoltStore(t, path.Join(filePath, "test.db")) + counter := callbackCounter{} + counter.setCallbackError(errors.New("error")) + s := NewNotifier(t.Name(), counter.callbackFailure, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier) + defer s.Close() - test.WaitFor(t, func() (bool, error) { - return counter.N.Load() == 1, nil - }, time.Second, "timeout while waiting for receiver") + _ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error { + bytes, _ := json.Marshal(event) + return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes) + }) + + err := s.Run() + require.NoError(t, err) + + stack := make([]byte, 4*1024) + test.WaitFor(t, func() (bool, error) { + runtime.Stack(stack, true) + index := strings.Index(string(stack), "dag.(*notifier).retry.func1") + return index != -1, nil + }, time.Second, "timeout while waiting for go routine to start") + }) + + t.Run("OK - does not retry unknown context errors", func(t *testing.T) { + ctx := context.Background() + filePath := io.TestDirectory(t) + kvStore := storage.CreateTestBBoltStore(t, path.Join(filePath, "test.db")) + counter := callbackCounter{} + s := NewNotifier(t.Name(), counter.callbackFinished, WithPersistency(kvStore), WithRetryDelay(time.Millisecond)).(*notifier) + defer s.Close() + + event := Event{Error: "some error: " + jsonld.ContextURLNotAllowedErr.Error()} + _ = kvStore.WriteShelf(ctx, s.shelfName(), func(writer stoabs.Writer) error { + bytes, _ := json.Marshal(event) + return writer.Put(stoabs.BytesKey(event.Hash.Slice()), bytes) + }) + + err := s.Run() + require.NoError(t, err) + assert.Equal(t, int64(0), counter.N.Load()) + }) } func TestNotifier_VariousFlows(t *testing.T) { @@ -447,7 +501,7 @@ func TestNotifier_VariousFlows(t *testing.T) { kvStore, _ := bbolt.CreateBBoltStore(path.Join(filePath, "test.db")) counter := callbackCounter{} notifiedCounter := &prometheusCounter{} - event := Event{Hash: hash.EmptyHash(), Transaction: transaction, Retries: 95} + event := Event{Hash: hash.EmptyHash(), Transaction: transaction, Retries: maxRetries - 5} s := NewNotifier(t.Name(), counter.callbackFailure, WithPersistency(kvStore), WithRetryDelay(time.Nanosecond), withCounters(notifiedCounter, nil)).(*notifier) defer s.Close() @@ -464,8 +518,8 @@ func TestNotifier_VariousFlows(t *testing.T) { return nil }) - return e.Retries == 100, nil - }, time.Second, "timeout while waiting for receiver") + return e.Retries == maxRetries, nil + }, 2*time.Second, "timeout while waiting for receiver") events, err := s.GetFailedEvents() @@ -532,7 +586,7 @@ func TestNotifier_VariousFlows(t *testing.T) { return nil }) return e.Retries >= maxRetries, nil - }, time.Second, "timeout while waiting for receiver") + }, 5*time.Second, "timeout while waiting for receiver") events, err := s.GetFailedEvents()