Skip to content

Commit

Permalink
Merge branch 'master' into mitzXnuts
Browse files Browse the repository at this point in the history
* master:
  Bump golang from 1.21.6-alpine to 1.22.0-alpine (#2786)
  Bump golang.org/x/crypto from 0.18.0 to 0.19.0 (#2787)
  VDR: Fix resolving keys for did:web DIDs with port (#2779)
  Bump gorm.io/driver/postgres from 1.5.4 to 1.5.6 (#2781)
  Discovery: 1 background process for refreshing Discovery Service (#2768)
  • Loading branch information
rolandgroen committed Feb 8, 2024
2 parents 5e4c06f + e8ba17e commit 0f3c382
Show file tree
Hide file tree
Showing 12 changed files with 160 additions and 146 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# golang alpine
FROM golang:1.21.6-alpine as builder
FROM golang:1.22.0-alpine as builder

ARG TARGETARCH
ARG TARGETOS
Expand Down
68 changes: 16 additions & 52 deletions discovery/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"fmt"
"github.com/nuts-foundation/go-did/did"
"github.com/nuts-foundation/go-did/vc"
"github.com/nuts-foundation/nuts-node/audit"
nutsCrypto "github.com/nuts-foundation/nuts-node/crypto"
"github.com/nuts-foundation/nuts-node/discovery/api/v1/client"
"github.com/nuts-foundation/nuts-node/discovery/log"
Expand All @@ -35,14 +34,12 @@ import (
)

// clientRegistrationManager is a client component, responsible for managing registrations on a Discovery Service.
// It automatically refreshes registered Verifiable Presentations when they are about to expire.
// It can refresh registered Verifiable Presentations when they are about to expire.
type clientRegistrationManager interface {
activate(ctx context.Context, serviceID string, subjectDID did.DID) error
deactivate(ctx context.Context, serviceID string, subjectDID did.DID) error
// refresh is a blocking call to periodically refresh registrations.
// It checks for Verifiable Presentations that are about to expire, and should be refreshed on the Discovery Service.
// It will exit when the given context is cancelled.
refresh(ctx context.Context, interval time.Duration)
// refresh checks which Verifiable Presentations that are about to expire, and should be refreshed on the Discovery Service.
refresh(ctx context.Context, now time.Time) error
}

var _ clientRegistrationManager = &defaultClientRegistrationManager{}
Expand Down Expand Up @@ -77,7 +74,7 @@ func (r *defaultClientRegistrationManager) activate(ctx context.Context, service
err := r.registerPresentation(ctx, subjectDID, service)
if err != nil {
// failed, will be retried on next scheduled refresh
return errors.Join(ErrPresentationRegistrationFailed, err)
return fmt.Errorf("%w: %w", ErrPresentationRegistrationFailed, err)
}
log.Logger().Debugf("Successfully registered Verifiable Presentation on Discovery Service (service=%s, did=%s)", serviceID, subjectDID)

Expand All @@ -102,7 +99,7 @@ func (r *defaultClientRegistrationManager) deactivate(ctx context.Context, servi
"credentialSubject.id": subjectDID.String(),
})
if err != nil {
return errors.Join(ErrPresentationRegistrationFailed, err)
return fmt.Errorf("%w: %w", ErrPresentationRegistrationFailed, err)
}
if len(presentations) == 0 {
// no registration, nothing to do
Expand All @@ -114,11 +111,11 @@ func (r *defaultClientRegistrationManager) deactivate(ctx context.Context, servi
"retract_jti": presentations[0].ID.String(),
})
if err != nil {
return errors.Join(ErrPresentationRegistrationFailed, err)
return fmt.Errorf("%w: %w", ErrPresentationRegistrationFailed, err)
}
err = r.client.Register(ctx, service.Endpoint, *presentation)
if err != nil {
return errors.Join(ErrPresentationRegistrationFailed, err)
return fmt.Errorf("%w: %w", ErrPresentationRegistrationFailed, err)
}
return nil
}
Expand Down Expand Up @@ -163,41 +160,22 @@ func (r *defaultClientRegistrationManager) buildPresentation(ctx context.Context
}, &subjectDID, false)
}

func (r *defaultClientRegistrationManager) doRefresh(ctx context.Context, now time.Time) error {
func (r *defaultClientRegistrationManager) refresh(ctx context.Context, now time.Time) error {
log.Logger().Debug("Refreshing own registered Verifiable Presentations on Discovery Services")
serviceIDs, dids, err := r.store.getPresentationsToBeRefreshed(now)
if err != nil {
return err
}
var result error = nil
for i, serviceID := range serviceIDs {
if err := r.activate(ctx, serviceID, dids[i]); err != nil {
log.Logger().WithError(err).Warnf("Failed to refresh Verifiable Presentation (service=%s, did=%s)", serviceID, dids[i])
result = errors.Join(result, fmt.Errorf("failed to refresh Verifiable Presentation (service=%s, did=%s): %w", serviceID, dids[i], err))
}
}
return nil
return result
}

func (r *defaultClientRegistrationManager) refresh(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
// do the first refresh immediately
do := func() {
if err := r.doRefresh(audit.Context(ctx, "app", ModuleName, "RefreshVerifiablePresentations"), time.Now()); err != nil {
log.Logger().WithError(err).Errorf("Failed to refresh Verifiable Presentations")
}
}
do()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
do()
}
}
}

// clientUpdater is responsible for updating the presentations for the given services, at the given interval.
// clientUpdater is responsible for updating the local copy of Discovery Services
// Callers should only call update().
type clientUpdater struct {
services map[string]ServiceDefinition
Expand All @@ -215,29 +193,15 @@ func newClientUpdater(services map[string]ServiceDefinition, store *sqlStore, ve
}
}

// update starts a blocking loop that updates the presentations for the given services, at the given interval.
// It returns when the context is cancelled.
func (u *clientUpdater) update(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
u.doUpdate(ctx)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
u.doUpdate(ctx)
}
}
}

func (u *clientUpdater) doUpdate(ctx context.Context) {
func (u *clientUpdater) update(ctx context.Context) error {
log.Logger().Debug("Checking for new Verifiable Presentations from Discovery Services")
var result error = nil
for _, service := range u.services {
if err := u.updateService(ctx, service); err != nil {
log.Logger().Errorf("Failed to update service (id=%s): %s", service.ID, err)
result = errors.Join(result, err)
}
}
return result
}

func (u *clientUpdater) updateService(ctx context.Context, service ServiceDefinition) error {
Expand Down
73 changes: 17 additions & 56 deletions discovery/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -150,7 +149,7 @@ func Test_scheduledRegistrationManager_deregister(t *testing.T) {
})
}

func Test_scheduledRegistrationManager_doRefreshRegistrations(t *testing.T) {
func Test_scheduledRegistrationManager_refresh(t *testing.T) {
storageEngine := storage.NewTestStorageEngine(t)
require.NoError(t, storageEngine.Start())

Expand All @@ -161,7 +160,7 @@ func Test_scheduledRegistrationManager_doRefreshRegistrations(t *testing.T) {
store := setupStore(t, storageEngine.GetSQLDatabase())
manager := newRegistrationManager(testDefinitions(), store, invoker, mockVCR)

err := manager.doRefresh(audit.TestContext(), time.Now())
err := manager.refresh(audit.TestContext(), time.Now())

require.NoError(t, err)
})
Expand All @@ -186,37 +185,9 @@ func Test_scheduledRegistrationManager_doRefreshRegistrations(t *testing.T) {
wallet.EXPECT().BuildPresentation(gomock.Any(), gomock.Any(), gomock.Any(), &bobDID, false).Return(&vpBob, nil)
wallet.EXPECT().List(gomock.Any(), bobDID).Return([]vc.VerifiableCredential{vcBob}, nil)

err := manager.doRefresh(audit.TestContext(), time.Now())
err := manager.refresh(audit.TestContext(), time.Now())

require.NoError(t, err)
})
}

func Test_scheduledRegistrationManager_refreshRegistrations(t *testing.T) {
storageEngine := storage.NewTestStorageEngine(t)
require.NoError(t, storageEngine.Start())

t.Run("context cancel stops the loop", func(t *testing.T) {
store := setupStore(t, storageEngine.GetSQLDatabase())
ctrl := gomock.NewController(t)
invoker := client.NewMockHTTPClient(ctrl)
mockVCR := vcr.NewMockVCR(ctrl)
wallet := holder.NewMockWallet(ctrl)
mockVCR.EXPECT().Wallet().Return(wallet).AnyTimes()
manager := newRegistrationManager(testDefinitions(), store, invoker, mockVCR)

ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
manager.refresh(ctx, time.Millisecond)
}()
// make sure the loop has at least once
time.Sleep(5 * time.Millisecond)
// Make sure the function exits when the context is cancelled
cancel()
wg.Wait()
assert.EqualError(t, err, "failed to refresh Verifiable Presentation (service=usecase_v1, did=did:example:alice): registration of Verifiable Presentation on remote Discovery Service failed: remote error")
})
}

Expand Down Expand Up @@ -294,42 +265,32 @@ func Test_clientUpdater_updateService(t *testing.T) {
}

func Test_clientUpdater_update(t *testing.T) {
storageEngine := storage.NewTestStorageEngine(t)
require.NoError(t, storageEngine.Start())
t.Run("context cancel stops the loop", func(t *testing.T) {
t.Run("proceeds when service update fails", func(t *testing.T) {
storageEngine := storage.NewTestStorageEngine(t)
require.NoError(t, storageEngine.Start())
store := setupStore(t, storageEngine.GetSQLDatabase())
ctrl := gomock.NewController(t)
httpClient := client.NewMockHTTPClient(ctrl)
httpClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return([]vc.VerifiablePresentation{}, "test", nil).MinTimes(1)
httpClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return([]vc.VerifiablePresentation{}, "test", nil)
httpClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, "", errors.New("test"))
updater := newClientUpdater(testDefinitions(), store, alwaysOkVerifier, httpClient)

ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
updater.update(ctx, time.Millisecond)
}()
// make sure the loop has at least once
time.Sleep(5 * time.Millisecond)
// Make sure the function exits when the context is cancelled
cancel()
wg.Wait()
})
}
err := updater.update(context.Background())

func Test_clientUpdater_doUpdate(t *testing.T) {
t.Run("proceeds when service update fails", func(t *testing.T) {
require.EqualError(t, err, "failed to get presentations from discovery service (id=other): test")
})
t.Run("no error", func(t *testing.T) {
storageEngine := storage.NewTestStorageEngine(t)
require.NoError(t, storageEngine.Start())
store := setupStore(t, storageEngine.GetSQLDatabase())
ctrl := gomock.NewController(t)
httpClient := client.NewMockHTTPClient(ctrl)
httpClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return([]vc.VerifiablePresentation{}, "test", nil)
httpClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, "", errors.New("test"))
httpClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return([]vc.VerifiablePresentation{}, "test", nil).MinTimes(2)
updater := newClientUpdater(testDefinitions(), store, alwaysOkVerifier, httpClient)

updater.doUpdate(context.Background())
err := updater.update(context.Background())

assert.NoError(t, err)
})
}

Expand Down
11 changes: 5 additions & 6 deletions discovery/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ func FlagSet() *pflag.FlagSet {
flagSet.StringSlice("discovery.server.definition_ids", defs.Server.DefinitionIDs,
"IDs of the Discovery Service Definitions for which to act as server. "+
"If an ID does not map to a loaded service definition, the node will fail to start.")
flagSet.Duration("discovery.client.registration_refresh_interval", defs.Client.RegistrationRefreshInterval,
"Interval at which the client should refresh checks for registrations to refresh on the configured Discovery Services,"+
"in Golang time.Duration string format (e.g. 1s). "+
"Note that it only will actually refresh registrations that about to expire (less than 1/4th of their lifetime left).")
flagSet.Duration("discovery.client.refresh_interval", defs.Client.RefreshInterval, "How often to check for new Verifiable Presentations on the Discovery Services to update the local copy. "+
"Specified as Golang duration (e.g. 1m, 1h30m).")
flagSet.Duration("discovery.client.refresh_interval", defs.Client.RefreshInterval,
"Interval at which the client synchronizes with the Discovery Server; "+
"refreshing Verifiable Presentations of local DIDs and loading changes, updating the local copy. "+
"It only will actually refresh registrations of local DIDs that about to expire (less than 1/4th of their lifetime left). "+
"Specified as Golang duration (e.g. 1m, 1h30m).")
return flagSet
}
6 changes: 1 addition & 5 deletions discovery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,14 @@ type ServerConfig struct {
type ClientConfig struct {
// RefreshInterval specifies how often the client should refresh the Discovery Services.
RefreshInterval time.Duration `koanf:"refresh_interval"`
// RegistrationRefreshInterval specifies how often the client should refresh its registrations on Discovery Services.
// At the same interval, failed registrations are refreshed.
RegistrationRefreshInterval time.Duration `koanf:"registration_refresh_interval"`
}

// DefaultConfig returns the default configuration.
func DefaultConfig() Config {
return Config{
Server: ServerConfig{},
Client: ClientConfig{
RefreshInterval: 10 * time.Minute,
RegistrationRefreshInterval: 10 * time.Minute,
RefreshInterval: 10 * time.Minute,
},
}
}
2 changes: 1 addition & 1 deletion discovery/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ import (
)

func TestDefaultConfig(t *testing.T) {
assert.NotEmpty(t, DefaultConfig().Client.RegistrationRefreshInterval)
assert.NotEmpty(t, DefaultConfig().Client.RefreshInterval)
}
45 changes: 35 additions & 10 deletions discovery/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
ssi "github.com/nuts-foundation/go-did"
"github.com/nuts-foundation/go-did/did"
"github.com/nuts-foundation/go-did/vc"
"github.com/nuts-foundation/nuts-node/audit"
"github.com/nuts-foundation/nuts-node/core"
"github.com/nuts-foundation/nuts-node/discovery/api/v1/client"
"github.com/nuts-foundation/nuts-node/discovery/log"
Expand Down Expand Up @@ -128,17 +129,14 @@ func (m *Module) Start() error {
return err
}
m.clientUpdater = newClientUpdater(m.allDefinitions, m.store, m.verifyRegistration, m.httpClient)
m.routines.Add(1)
go func() {
defer m.routines.Done()
m.clientUpdater.update(m.ctx, m.config.Client.RefreshInterval)
}()
m.registrationManager = newRegistrationManager(m.allDefinitions, m.store, m.httpClient, m.vcrInstance)
m.routines.Add(1)
go func() {
defer m.routines.Done()
m.registrationManager.refresh(m.ctx, m.config.Client.RegistrationRefreshInterval)
}()
if m.config.Client.RefreshInterval > 0 {
m.routines.Add(1)
go func() {
defer m.routines.Done()
m.update()
}()
}
return nil
}

Expand Down Expand Up @@ -288,6 +286,7 @@ func (m *Module) ActivateServiceForDID(ctx context.Context, serviceID string, su
log.Logger().WithError(err).Warnf("Presentation registration failed, will be retried later (did=%s,service=%s)", subjectDID, serviceID)
} else if err == nil {
log.Logger().Infof("Successfully activated service for DID (did=%s,service=%s)", subjectDID, serviceID)
_ = m.clientUpdater.updateService(ctx, m.allDefinitions[serviceID])
}
return err
}
Expand Down Expand Up @@ -394,6 +393,32 @@ func (m *Module) Search(serviceID string, query map[string]string) ([]SearchResu
return result, nil
}

func (m *Module) update() {
ticker := time.NewTicker(m.config.Client.RefreshInterval)
defer ticker.Stop()
ctx := audit.Context(m.ctx, "app", ModuleName, "RefreshDiscoveryClient")
do := func() {
// Refresh registrations first, to make sure we have (our own) latest presentations when we load them from the Discovery Service
err := m.registrationManager.refresh(ctx, time.Now())
if err != nil {
log.Logger().WithError(err).Errorf("Failed to refresh own Verifiable Presentations on Discovery Service")
}
err = m.clientUpdater.update(m.ctx)
if err != nil {
log.Logger().WithError(err).Errorf("Failed to load latest Verifiable Presentations from Discovery Service")
}
}
do()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
do()
}
}
}

// validateAudience checks if the given audience of the presentation matches the service ID.
func validateAudience(service ServiceDefinition, audience []string) error {
for _, audienceID := range audience {
Expand Down
Loading

0 comments on commit 0f3c382

Please sign in to comment.