From 67b411efe0becae68d1c69bab8b0bc00c9b6378c Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Fri, 6 Dec 2024 10:57:28 -0800 Subject: [PATCH] refactor: add internal data system config (#185) This PR represents a major internal refactoring of the FDv2 data system components. Along with better support for the FDv2 protocol, it adds support for FDv2 data source contract tests as well. Missing pieces include an updated Data Source Status monitoring API, as well as support for dual synchronizers. --------- Co-authored-by: Matthew M. Keeler --- config.go | 20 + internal/datasourcev2/polling_data_source.go | 40 +- internal/datasourcev2/polling_http_request.go | 121 ++---- internal/datasourcev2/polling_response.go | 48 --- .../datasourcev2/streaming_data_source.go | 198 ++++------ .../data_store_status_provider_impl.go | 21 +- internal/datasystem/fdv2_datasystem.go | 343 ++++++++++++++++++ internal/datasystem/store.go | 20 +- internal/datasystem/store_test.go | 80 ++-- internal/fdv2proto/changeset.go | 120 ++++++ internal/fdv2proto/changeset_test.go | 82 +++++ .../fdv2proto/deltas_to_storable_items.go | 59 +++ internal/fdv2proto/event_to_storable_item.go | 59 --- internal/fdv2proto/events.go | 74 ++-- internal/fdv2proto/selector.go | 36 +- .../sharedtest/mocks/mock_data_destination.go | 14 +- ldclient.go | 16 +- ldclient_end_to_end_fdv2_test.go | 275 ++++++++++++++ ldclient_evaluation_all_flags_test.go | 3 +- .../data_system_configuration_builder.go | 199 ++++++++++ .../polling_data_source_builder_v2.go | 26 +- .../polling_data_source_builder_v2_test.go | 17 +- .../streaming_data_source_builder_v2.go | 20 +- .../streaming_data_source_builder_v2_test.go | 19 +- subsystems/data_destination.go | 4 +- subsystems/data_source.go | 60 ++- subsystems/datasystem_configuration.go | 22 ++ testhelpers/ldservicesv2/package.go | 2 + testhelpers/ldservicesv2/server_sdk_data.go | 87 +++++ .../streaming_protocol_builder.go | 83 +++++ testservice/sdk_client_entity.go | 267 ++++++++++---- testservice/servicedef/sdk_config.go | 38 ++ 32 files changed, 1942 insertions(+), 531 deletions(-) delete mode 100644 internal/datasourcev2/polling_response.go create mode 100644 internal/datasystem/fdv2_datasystem.go create mode 100644 internal/fdv2proto/changeset.go create mode 100644 internal/fdv2proto/changeset_test.go create mode 100644 internal/fdv2proto/deltas_to_storable_items.go delete mode 100644 internal/fdv2proto/event_to_storable_item.go create mode 100644 ldclient_end_to_end_fdv2_test.go create mode 100644 ldcomponents/data_system_configuration_builder.go create mode 100644 subsystems/datasystem_configuration.go create mode 100644 testhelpers/ldservicesv2/package.go create mode 100644 testhelpers/ldservicesv2/server_sdk_data.go create mode 100644 testhelpers/ldservicesv2/streaming_protocol_builder.go diff --git a/config.go b/config.go index 5be351b1..949aea87 100644 --- a/config.go +++ b/config.go @@ -197,4 +197,24 @@ type Config struct { // LaunchDarkly provides integration packages, and most applications will not // need to implement their own hooks. Hooks []ldhooks.Hook + + // This field is not stable, and not subject to any backwards compatibility guarantees or semantic versioning. + // It is not suitable for production usage. Do not use it. You have been warned. + // + // DataSystem configures how data (e.g. flags, segments) are retrieved by the SDK. + // + // Set this field only if you want to specify non-default values for any of the data system configuration, + // such as defining an alternate data source or setting up a persistent store. + // + // Below, the default configuration is described with the relevant config item in parentheses: + // 1. The SDK will first attempt to fetch all data from LaunchDarkly's global Content Delivery Network (Initializer) + // 2. It will then establish a streaming connection with LaunchDarkly's realtime Flag Delivery Network (Primary + // Synchronizer.) + // 3. If at any point the connection to the realtime network is interrupted for a short period of time, + // the connection will be automatically re-established. + // 4. If the connection cannot be re-established over a sustained period, the SDK will begin to make periodic + // requests to LaunchDarkly's global CDN (Secondary Synchronizer) + // 5. After a period of time, the SDK will swap back to the realtime Flag Delivery Network if it becomes + // available again. + DataSystem subsystems.ComponentConfigurer[subsystems.DataSystemConfiguration] } diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index 2311fd54..d81837ad 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -1,6 +1,7 @@ package datasourcev2 import ( + "context" "sync" "time" @@ -21,7 +22,7 @@ const ( // PollingRequester allows PollingProcessor to delegate fetching data to another component. // This is useful for testing the PollingProcessor without needing to set up a test HTTP server. type PollingRequester interface { - Request() (*PollingResponse, error) + Request() (*fdv2proto.ChangeSet, error) BaseURI() string FilterKey() string } @@ -72,8 +73,24 @@ func newPollingProcessor( return pp } -//nolint:revive // no doc comment for standard method -func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) { +//nolint:revive // DataInitializer method. +func (pp *PollingProcessor) Name() string { + return "PollingDataSourceV2" +} + +//nolint:revive // DataInitializer method. +func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error) { + //nolint:godox + // TODO(SDK-752): Plumb the context into the request method. + basis, err := pp.requester.Request() + if err != nil { + return nil, err + } + return &subsystems.Basis{Events: basis.Changes(), Selector: basis.Selector(), Persist: true}, nil +} + +//nolint:revive // DataSynchronizer method. +func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ fdv2proto.Selector) { pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval) ticker := newTickerWithInitialTick(pp.pollInterval) @@ -142,21 +159,22 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) { } func (pp *PollingProcessor) poll() error { - response, err := pp.requester.Request() + changeSet, err := pp.requester.Request() if err != nil { return err } - if response.Cached() { - return nil - } - - switch response.Intent() { + code := changeSet.IntentCode() + switch code { case fdv2proto.IntentTransferFull: - pp.dataDestination.SetBasis(response.Events(), response.Selector(), true) + pp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true) case fdv2proto.IntentTransferChanges: - pp.dataDestination.ApplyDelta(response.Events(), response.Selector(), true) + pp.dataDestination.ApplyDelta(changeSet.Changes(), changeSet.Selector(), true) + case fdv2proto.IntentNone: + { + // no-op, we are already up-to-date. + } } return nil diff --git a/internal/datasourcev2/polling_http_request.go b/internal/datasourcev2/polling_http_request.go index 0cd81ddf..191ec7b7 100644 --- a/internal/datasourcev2/polling_http_request.go +++ b/internal/datasourcev2/polling_http_request.go @@ -2,22 +2,19 @@ package datasourcev2 import ( "encoding/json" - "errors" "fmt" "io" "net/http" "net/url" - "github.com/launchdarkly/go-jsonstream/v3/jreader" "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + "github.com/gregjones/httpcache" + "golang.org/x/exp/maps" + "github.com/launchdarkly/go-sdk-common/v3/ldlog" "github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" "github.com/launchdarkly/go-server-sdk/v7/subsystems" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" - - "github.com/gregjones/httpcache" - "golang.org/x/exp/maps" ) // pollingRequester is the internal implementation of getting flag/segment data from the LD polling endpoints. @@ -70,7 +67,7 @@ func (r *pollingRequester) FilterKey() string { return r.filterKey } -func (r *pollingRequester) Request() (*PollingResponse, error) { +func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) { if r.loggers.IsDebugEnabled() { r.loggers.Debug("Polling LaunchDarkly for feature flag updates") } @@ -80,7 +77,7 @@ func (r *pollingRequester) Request() (*PollingResponse, error) { return nil, err } if cached { - return NewCachedPollingResponse(), nil + return fdv2proto.NewChangeSetBuilder().NoChanges(), nil } var payload fdv2proto.PollingPayload @@ -88,102 +85,44 @@ func (r *pollingRequester) Request() (*PollingResponse, error) { return nil, malformedJSONError{err} } - parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) { - dataKind, err := kind.ToFDV1() - if err != nil { - return ldstoretypes.ItemDescriptor{}, err - } - item, err := dataKind.DeserializeFromJSONReader(&r) - return item, err - } - - updates := make([]fdv2proto.Event, 0, len(payload.Events)) - - var intentCode fdv2proto.IntentCode + changeSet := fdv2proto.NewChangeSetBuilder() for _, event := range payload.Events { switch event.Name { case fdv2proto.EventServerIntent: - { - var serverIntent fdv2proto.ServerIntent - err := json.Unmarshal(event.Data, &serverIntent) - if err != nil { - return nil, err - } else if len(serverIntent.Payloads) == 0 { - return nil, errors.New("server-intent event has no payloads") - } - - intentCode = serverIntent.Payloads[0].Code - if intentCode == fdv2proto.IntentNone { - return NewCachedPollingResponse(), nil - } + var serverIntent fdv2proto.ServerIntent + err := json.Unmarshal(event.Data, &serverIntent) + if err != nil { + return nil, err + } + if serverIntent.Payload.Code == fdv2proto.IntentNone { + return changeSet.NoChanges(), nil + } + if err := changeSet.Start(serverIntent); err != nil { + return nil, err } case fdv2proto.EventPutObject: - { - r := jreader.NewReader(event.Data) - - var ( - key string - kind fdv2proto.ObjectKind - item ldstoretypes.ItemDescriptor - err error - version int - ) - - for obj := r.Object().WithRequiredProperties([]string{ - versionField, kindField, keyField, objectField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - case keyField: - key = r.String() - case objectField: - item, err = parseItem(r, kind) - if err != nil { - return nil, err - } - } - } - updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item, Version: version}) + var put fdv2proto.PutObject + if err := json.Unmarshal(event.Data, &put); err != nil { + return nil, err } + changeSet.AddPut(put.Kind, put.Key, put.Version, put.Object) case fdv2proto.EventDeleteObject: - { - r := jreader.NewReader(event.Data) - - var ( - version int - kind fdv2proto.ObjectKind - key string - ) - - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - //nolint:godox - // TODO: An unrecognized kind should be ignored for forwards compat; the question is, - // do we throw out the DeleteObject here, or let the SDK's store handle it? - case keyField: - key = r.String() - } - } - updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version}) + var deleteObject fdv2proto.DeleteObject + if err := json.Unmarshal(event.Data, &deleteObject); err != nil { + return nil, err } + changeSet.AddDelete(deleteObject.Kind, deleteObject.Key, deleteObject.Version) case fdv2proto.EventPayloadTransferred: - //nolint:godox - // TODO: deserialize the state and create a fdv2proto.Selector. + var selector fdv2proto.Selector + if err := json.Unmarshal(event.Data, &selector); err != nil { + return nil, err + } + return changeSet.Finish(selector) } } - if intentCode == "" { - return nil, errors.New("no server-intent event found in polling response") - } - - return NewPollingResponse(intentCode, updates, fdv2proto.NoSelector()), nil + return nil, fmt.Errorf("didn't receive any known protocol events in polling payload") } func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) { diff --git a/internal/datasourcev2/polling_response.go b/internal/datasourcev2/polling_response.go deleted file mode 100644 index 80b2c36f..00000000 --- a/internal/datasourcev2/polling_response.go +++ /dev/null @@ -1,48 +0,0 @@ -package datasourcev2 - -import "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" - -// PollingResponse represents the result of a polling request. -type PollingResponse struct { - events []fdv2proto.Event - cached bool - intent fdv2proto.IntentCode - selector *fdv2proto.Selector -} - -// NewCachedPollingResponse indicates that the response has not changed. -func NewCachedPollingResponse() *PollingResponse { - return &PollingResponse{ - cached: true, - } -} - -// NewPollingResponse indicates that data was received. -func NewPollingResponse(intent fdv2proto.IntentCode, events []fdv2proto.Event, - selector *fdv2proto.Selector) *PollingResponse { - return &PollingResponse{ - events: events, - intent: intent, - selector: selector, - } -} - -// Events returns the events in the response. -func (p *PollingResponse) Events() []fdv2proto.Event { - return p.events -} - -// Cached returns true if the response was cached, meaning data has not changed. -func (p *PollingResponse) Cached() bool { - return p.cached -} - -// Intent returns the server intent code of the response. -func (p *PollingResponse) Intent() fdv2proto.IntentCode { - return p.intent -} - -// Selector returns the Selector of the response. -func (p *PollingResponse) Selector() *fdv2proto.Selector { - return p.selector -} diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index 3c02812d..20dbddd1 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -10,7 +10,10 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" - "github.com/launchdarkly/go-jsonstream/v3/jreader" + "context" + + es "github.com/launchdarkly/eventsource" + "github.com/launchdarkly/go-sdk-common/v3/ldlog" "github.com/launchdarkly/go-sdk-common/v3/ldtime" ldevents "github.com/launchdarkly/go-sdk-events/v3" @@ -19,19 +22,11 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" "github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" "github.com/launchdarkly/go-server-sdk/v7/subsystems" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" - - es "github.com/launchdarkly/eventsource" "golang.org/x/exp/maps" ) const ( - keyField = "key" - kindField = "kind" - versionField = "version" - objectField = "object" - streamReadTimeout = 5 * time.Minute // the LaunchDarkly stream should send a heartbeat comment every 3 minutes streamMaxRetryDelay = 30 * time.Second streamRetryResetInterval = 60 * time.Second @@ -42,11 +37,6 @@ const ( streamingWillRetryMessage = "will retry" ) -type changeSet struct { - intent *fdv2proto.ServerIntent - events []es.Event -} - // Implementation of the streaming data source, not including the lower-level SSE implementation which is in // the eventsource package. // @@ -89,7 +79,6 @@ type StreamProcessor struct { connectionAttemptLock sync.Mutex readyOnce sync.Once closeOnce sync.Once - persist bool } // NewStreamProcessor creates the internal implementation of the streaming data source. @@ -106,7 +95,6 @@ func NewStreamProcessor( loggers: context.GetLogging().Loggers, halt: make(chan struct{}), cfg: cfg, - persist: true, } if cci, ok := context.(*internal.ClientContextImpl); ok { sp.diagnosticsManager = cci.DiagnosticsManager @@ -122,17 +110,28 @@ func NewStreamProcessor( return sp } +//nolint:revive // DataInitializer method. +func (sp *StreamProcessor) Name() string { + return "StreamingDataSourceV2" +} + +//nolint:revive // DataInitializer method. +func (sp *StreamProcessor) Fetch(_ context.Context) (*subsystems.Basis, error) { + return nil, errors.New("StreamProcessor does not implement Fetch capability") +} + //nolint:revive // no doc comment for standard method func (sp *StreamProcessor) IsInitialized() bool { return sp.isInitialized.Get() } -//nolint:revive // no doc comment for standard method -func (sp *StreamProcessor) Start(closeWhenReady chan<- struct{}) { +//nolint:revive // DataSynchronizer method. +func (sp *StreamProcessor) Sync(closeWhenReady chan<- struct{}, selector fdv2proto.Selector) { sp.loggers.Info("Starting LaunchDarkly streaming connection") - go sp.subscribe(closeWhenReady) + go sp.subscribe(closeWhenReady, selector) } +//nolint:gocyclo func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<- struct{}) { // Consume remaining Events and Errors so we can garbage collect defer func() { @@ -144,9 +143,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< } }() - currentChangeSet := changeSet{ - events: make([]es.Event, 0), - } + changeSetBuilder := fdv2proto.NewChangeSetBuilder() for { select { @@ -169,6 +166,9 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< shouldRestart := false gotMalformedEvent := func(event es.Event, err error) { + // The protocol should "forget" anything that happens upon receiving an error. + changeSetBuilder = fdv2proto.NewChangeSetBuilder() + if event == nil { sp.loggers.Errorf( "Received streaming events with malformed JSON data (%s); will restart stream", @@ -197,29 +197,42 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< case fdv2proto.EventHeartbeat: // Swallow the event and move on. case fdv2proto.EventServerIntent: - //nolint: godox - // TODO: Replace all this json unmarshalling with a nicer jreader implementation. + var serverIntent fdv2proto.ServerIntent err := json.Unmarshal([]byte(event.Data()), &serverIntent) if err != nil { gotMalformedEvent(event, err) break - } else if len(serverIntent.Payloads) == 0 { - gotMalformedEvent(event, errors.New("server-intent event has no payloads")) - break } - if serverIntent.Payloads[0].Code == "none" { - sp.loggers.Info("Server intent is none, skipping") - continue + // IntentNone is a special case where we won't receive a payload-transferred event, so we will need + // to instead immediately notify the client that we are initialized. + if serverIntent.Payload.Code == fdv2proto.IntentNone { + sp.setInitializedAndNotifyClient(true, closeWhenReady) + break } - currentChangeSet = changeSet{events: make([]es.Event, 0), intent: &serverIntent} + if err := changeSetBuilder.Start(serverIntent); err != nil { + gotMalformedEvent(event, err) + break + } case fdv2proto.EventPutObject: - currentChangeSet.events = append(currentChangeSet.events, event) + var p fdv2proto.PutObject + err := json.Unmarshal([]byte(event.Data()), &p) + if err != nil { + gotMalformedEvent(event, err) + break + } + changeSetBuilder.AddPut(p.Kind, p.Key, p.Version, p.Object) case fdv2proto.EventDeleteObject: - currentChangeSet.events = append(currentChangeSet.events, event) + var d fdv2proto.DeleteObject + err := json.Unmarshal([]byte(event.Data()), &d) + if err != nil { + gotMalformedEvent(event, err) + break + } + changeSetBuilder.AddDelete(d.Kind, d.Key, d.Version) case fdv2proto.EventGoodbye: var goodbye fdv2proto.Goodbye err := json.Unmarshal([]byte(event.Data()), &goodbye) @@ -235,48 +248,44 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< var errorData fdv2proto.Error err := json.Unmarshal([]byte(event.Data()), &errorData) if err != nil { - //nolint: godox - // TODO: Confirm that an error means we have to discard - // everything that has come before. - currentChangeSet = changeSet{events: make([]es.Event, 0)} gotMalformedEvent(event, err) break } sp.loggers.Errorf("Error on %s: %s", errorData.PayloadID, errorData.Reason) - currentChangeSet = changeSet{events: make([]es.Event, 0)} - //nolint: godox - // TODO: Do we need to restart here? + // The protocol should "forget" anything that has happened, and expect that we will receive + // more messages in the future (starting with a server intent.) + changeSetBuilder = fdv2proto.NewChangeSetBuilder() case fdv2proto.EventPayloadTransferred: - - selector := &fdv2proto.Selector{} - - err := json.Unmarshal([]byte(event.Data()), selector) + var selector fdv2proto.Selector + err := json.Unmarshal([]byte(event.Data()), &selector) if err != nil { gotMalformedEvent(event, err) break } - currentChangeSet.events = append(currentChangeSet.events, event) - updates, err := deserializeEvents(currentChangeSet.events) + // After calling Finish, the builder is ready to receive a new changeset. + changeSet, err := changeSetBuilder.Finish(selector) if err != nil { - sp.loggers.Errorf("Error processing changeset: %s", err) gotMalformedEvent(nil, err) break } - switch currentChangeSet.intent.Payloads[0].Code { + code := changeSet.IntentCode() + switch code { case fdv2proto.IntentTransferFull: - { - sp.dataDestination.SetBasis(updates, selector, true) - sp.setInitializedAndNotifyClient(true, closeWhenReady) - } + sp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true) case fdv2proto.IntentTransferChanges: - sp.dataDestination.ApplyDelta(updates, selector, true) + sp.dataDestination.ApplyDelta(changeSet.Changes(), changeSet.Selector(), true) + case fdv2proto.IntentNone: + /* We don't expect to receive this, but it could be possible. In that case, it should be + equivalent to transferring no changes - a no-op. + */ } - currentChangeSet = changeSet{events: make([]es.Event, 0)} + sp.setInitializedAndNotifyClient(true, closeWhenReady) + default: sp.loggers.Infof("Unexpected event found in stream: %s", event.Event()) } @@ -295,8 +304,12 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< } } -func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) { - req, reqErr := http.NewRequest("GET", endpoints.AddPath(sp.cfg.URI, endpoints.StreamingRequestPath), nil) +func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}, selector fdv2proto.Selector) { + path := endpoints.AddPath(sp.cfg.URI, endpoints.StreamingRequestPath) + if selector.IsDefined() { + path = path + "?basis=" + selector.State() + } + req, reqErr := http.NewRequest("GET", path, nil) if reqErr != nil { sp.loggers.Errorf( "Unable to create a stream request; this is not a network problem, most likely a bad base URI: %s", @@ -446,77 +459,4 @@ func (sp *StreamProcessor) GetFilterKey() string { return sp.cfg.FilterKey } -func deserializeEvents(events []es.Event) ([]fdv2proto.Event, error) { - updates := make([]fdv2proto.Event, 0, len(events)) - - parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) { - dataKind, err := kind.ToFDV1() - if err != nil { - return ldstoretypes.ItemDescriptor{}, err - } - item, err := dataKind.DeserializeFromJSONReader(&r) - return item, err - } - - for _, event := range events { - switch fdv2proto.EventName(event.Event()) { - case fdv2proto.EventPutObject: - r := jreader.NewReader([]byte(event.Data())) - - var ( - kind fdv2proto.ObjectKind - key string - version int - item ldstoretypes.ItemDescriptor - err error - ) - - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField, objectField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - //nolint:godox - // TODO: An unrecognized kind should be ignored for forwards compat; the question is, - // do we throw out the DeleteObject here, or let the SDK's store handle it? - case keyField: - key = r.String() - case objectField: - item, err = parseItem(r, kind) - if err != nil { - return updates, err - } - } - } - updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item.Item, Version: version}) - case fdv2proto.EventDeleteObject: - r := jreader.NewReader([]byte(event.Data())) - - var ( - version int - kind fdv2proto.ObjectKind - key string - ) - - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - //nolint:godox - // TODO: An unrecognized kind should be ignored for forwards compat; the question is, - // do we throw out the DeleteObject here, or let the SDK's store handle it? - case keyField: - key = r.String() - } - } - updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version}) - } - } - - return updates, nil -} - // vim: foldmethod=marker foldlevel=0 diff --git a/internal/datastore/data_store_status_provider_impl.go b/internal/datastore/data_store_status_provider_impl.go index 19af353d..4a6bd08f 100644 --- a/internal/datastore/data_store_status_provider_impl.go +++ b/internal/datastore/data_store_status_provider_impl.go @@ -2,19 +2,34 @@ package datastore import ( "github.com/launchdarkly/go-server-sdk/v7/interfaces" - "github.com/launchdarkly/go-server-sdk/v7/subsystems" ) +// StatusMonitorable means a data store is capable of having its status monitored over time. +type StatusMonitorable interface { + // IsStatusMonitoringEnabled returns true if this data store implementation supports status + // monitoring. + // + // This is normally only true for persistent data stores created with ldcomponents.PersistentDataStore(), + // but it could also be true for any custom DataStore implementation that makes use of the + // statusUpdater parameter provided to the DataStoreFactory. Returning true means that the store + // guarantees that if it ever enters an invalid state (that is, an operation has failed or it knows + // that operations cannot succeed at the moment), it will publish a status update, and will then + // publish another status update once it has returned to a valid state. + // + // The same value will be returned from DataStoreStatusProvider.IsStatusMonitoringEnabled(). + IsStatusMonitoringEnabled() bool +} + // dataStoreStatusProviderImpl is the internal implementation of DataStoreStatusProvider. It's not // exported because the rest of the SDK code only interacts with the public interface. type dataStoreStatusProviderImpl struct { - store subsystems.DataStore + store StatusMonitorable dataStoreUpdates *DataStoreUpdateSinkImpl } // NewDataStoreStatusProviderImpl creates the internal implementation of DataStoreStatusProvider. func NewDataStoreStatusProviderImpl( - store subsystems.DataStore, + store StatusMonitorable, dataStoreUpdates *DataStoreUpdateSinkImpl, ) interfaces.DataStoreStatusProvider { return &dataStoreStatusProviderImpl{ diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go new file mode 100644 index 00000000..77e57005 --- /dev/null +++ b/internal/datasystem/fdv2_datasystem.go @@ -0,0 +1,343 @@ +package datasystem + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + + "github.com/launchdarkly/go-sdk-common/v3/ldlog" + "github.com/launchdarkly/go-server-sdk/v7/interfaces" + "github.com/launchdarkly/go-server-sdk/v7/internal" + "github.com/launchdarkly/go-server-sdk/v7/internal/datastore" + "github.com/launchdarkly/go-server-sdk/v7/subsystems" +) + +var _ subsystems.DataDestination = (*Store)(nil) +var _ subsystems.ReadOnlyStore = (*Store)(nil) + +type broadcasters struct { + dataSourceStatus *internal.Broadcaster[interfaces.DataSourceStatus] + dataStoreStatus *internal.Broadcaster[interfaces.DataStoreStatus] + flagChangeEvent *internal.Broadcaster[interfaces.FlagChangeEvent] +} + +// FDv2 is an implementation of the DataSystem interface that uses the Flag Delivery V2 protocol for +// obtaining and keeping data up-to-date. Additionally, it operates with an optional persistent store +// in read-only or read/write mode. +type FDv2 struct { + // Operates the in-memory and optional persistent store that backs data queries. + store *Store + + // List of initializers that are capable of obtaining an initial payload of data. + initializers []subsystems.DataInitializer + + // The primary synchronizer responsible for keeping data up-to-date. + primarySync subsystems.DataSynchronizer + + // The secondary synchronizer, in case the primary is unavailable. + secondarySync subsystems.DataSynchronizer + + // Whether the SDK should make use of persistent store/initializers/synchronizers or not. + disabled bool + + loggers ldlog.Loggers + + // Cancel and wg are used to track and stop the goroutines used by the system. + cancel context.CancelFunc + wg sync.WaitGroup + + // The SDK client, via MakeClient, expects to pass a channel down into a data source which will then be + // closed when the source is considered to be ready or in a terminal state. This is what allows the initialization + // timeout logic to work correctly and return early - otherwise, users would have to wait the full init timeout + // before receiving a status update. The following are true: + // 1. Initializers may close the channel (because an initializer's job is to initialize the SDK!) + // 2. Synchronizers may close the channel (because an initializer might not be configured, or have failed) + // To ensure the channel is closed only once, we use a sync.Once wrapping the close() call. + readyOnce sync.Once + + // These broadcasters are mainly to satisfy the existing SDK contract with users to provide status updates for + // the data source, data store, and flag change events. These may be different in fdv2, but we attempt to implement + // them for now. + broadcasters *broadcasters + + // We hold a reference to the dataStoreStatusProvider because it's required for the public interface of the + // SDK client. + dataStoreStatusProvider interfaces.DataStoreStatusProvider + + dataSourceStatusProvider *dataStatusProvider + + // Protects status. + mu sync.Mutex + status interfaces.DataSourceStatus +} + +// NewFDv2 creates a new instance of the FDv2 data system. The first argument indicates if the system is enabled or +// disabled. +func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems.DataSystemConfiguration], + clientContext *internal.ClientContextImpl) (*FDv2, error) { + store := NewStore(clientContext.GetLogging().Loggers) + + bcasters := &broadcasters{ + dataSourceStatus: internal.NewBroadcaster[interfaces.DataSourceStatus](), + dataStoreStatus: internal.NewBroadcaster[interfaces.DataStoreStatus](), + flagChangeEvent: internal.NewBroadcaster[interfaces.FlagChangeEvent](), + } + + fdv2 := &FDv2{ + store: store, + loggers: clientContext.GetLogging().Loggers, + broadcasters: bcasters, + dataSourceStatusProvider: &dataStatusProvider{}, + } + + // Unfortunate circular reference. + fdv2.dataSourceStatusProvider.system = fdv2 + + dataStoreUpdateSink := datastore.NewDataStoreUpdateSinkImpl(bcasters.dataStoreStatus) + clientContextCopy := *clientContext + clientContextCopy.DataStoreUpdateSink = dataStoreUpdateSink + clientContextCopy.DataDestination = store + clientContextCopy.DataSourceStatusReporter = fdv2 + + cfg, err := cfgBuilder.Build(clientContextCopy) + if err != nil { + return nil, err + } + + fdv2.initializers = cfg.Initializers + fdv2.primarySync = cfg.Synchronizers.Primary + fdv2.secondarySync = cfg.Synchronizers.Secondary + fdv2.disabled = disabled + + if cfg.Store != nil && !disabled { + // If there's a persistent Store, we should provide a status monitor and inform Store that it's present. + fdv2.dataStoreStatusProvider = datastore.NewDataStoreStatusProviderImpl(cfg.Store, dataStoreUpdateSink) + store.WithPersistence(cfg.Store, cfg.StoreMode, fdv2.dataStoreStatusProvider) + } else { + // If there's no persistent Store, we still need to satisfy the SDK's public interface of having + // a data Store status provider. So we create one that just says "I don't know what's going on". + fdv2.dataStoreStatusProvider = datastore.NewDataStoreStatusProviderImpl(noStatusMonitoring{}, dataStoreUpdateSink) + } + + return fdv2, nil +} + +type noStatusMonitoring struct{} + +func (n noStatusMonitoring) IsStatusMonitoringEnabled() bool { + return false +} + +// Start starts the FDv2 data system. If not disabled, it will begin initializing via the configured +// initializers, and then start the primary synchronizer. +func (f *FDv2) Start(closeWhenReady chan struct{}) { + if f.disabled { + f.loggers.Infof("Data system is disabled, SDK will return application-defined default values") + close(closeWhenReady) + return + } + ctx, cancel := context.WithCancel(context.Background()) + f.cancel = cancel + f.launchTask(func() { + f.run(ctx, closeWhenReady) + }) +} + +func (f *FDv2) launchTask(task func()) { + f.wg.Add(1) + go func() { + defer f.wg.Done() + task() + }() +} + +func (f *FDv2) hasDataSources() bool { + return len(f.initializers) > 0 || f.primarySync != nil +} + +func (f *FDv2) run(ctx context.Context, closeWhenReady chan struct{}) { + selector := f.runInitializers(ctx, closeWhenReady) + + if f.hasDataSources() && f.dataStoreStatusProvider.IsStatusMonitoringEnabled() { + f.launchTask(func() { + f.runPersistentStoreOutageRecovery(ctx, f.dataStoreStatusProvider.AddStatusListener()) + }) + } + + f.runSynchronizers(ctx, closeWhenReady, selector) +} + +func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <-chan interfaces.DataStoreStatus) { + for { + select { + case newStoreStatus := <-statuses: + if newStoreStatus.Available { + // The Store has just transitioned from unavailable to available + if newStoreStatus.NeedsRefresh { + f.loggers.Warn("Reinitializing data Store from in-memory cache after after data Store outage") + if err := f.store.Commit(); err != nil { + f.loggers.Error("Failed to reinitialize data Store: %v", err) + } + } + } + case <-ctx.Done(): + return + } + } +} + +func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) fdv2proto.Selector { + for _, initializer := range f.initializers { + f.loggers.Infof("Attempting to initialize via %s", initializer.Name()) + basis, err := initializer.Fetch(ctx) + if errors.Is(err, context.Canceled) { + return fdv2proto.NoSelector() + } + if err != nil { + f.loggers.Warnf("Initializer %s failed: %v", initializer.Name(), err) + continue + } + f.loggers.Infof("Initialized via %s", initializer.Name()) + f.store.SetBasis(basis.Events, basis.Selector, basis.Persist) + f.readyOnce.Do(func() { + close(closeWhenReady) + }) + return basis.Selector + } + return fdv2proto.NoSelector() +} + +func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}, selector fdv2proto.Selector) { + // If the SDK was configured with no synchronizer, then (assuming no initializer succeeded), we should + // trigger the ready signal to let the call to MakeClient unblock immediately. + if f.primarySync == nil { + f.readyOnce.Do(func() { + close(closeWhenReady) + }) + return + } + + // We can't pass closeWhenReady to the data source, because it might have already been closed. + // Instead, create a "proxy" channel just for the data source; if that is closed, we close the real one + // using the sync.Once. + ready := make(chan struct{}) + f.primarySync.Sync(ready, selector) + + for { + select { + case <-ready: + f.readyOnce.Do(func() { + close(closeWhenReady) + }) + case <-ctx.Done(): + return + } + } +} + +// Stop shuts down the data system. It will close any active synchronizers. If initialization is in progress, +// it will cancel the process gracefully. +func (f *FDv2) Stop() error { + if f.cancel != nil { + f.cancel() + f.wg.Wait() + } + _ = f.store.Close() + if f.primarySync != nil { + _ = f.primarySync.Close() + } + if f.secondarySync != nil { + _ = f.secondarySync.Close() + } + return nil +} + +//nolint:revive // DataSystem method. +func (f *FDv2) Store() subsystems.ReadOnlyStore { + return f.store +} + +//nolint:revive // DataSystem method. +func (f *FDv2) DataAvailability() DataAvailability { + if f.store.Selector().IsDefined() { + return Refreshed + } + if f.store.IsInitialized() { + return Cached + } + return Defaults +} + +//nolint:revive // DataSystem method. +func (f *FDv2) DataSourceStatusBroadcaster() *internal.Broadcaster[interfaces.DataSourceStatus] { + return f.broadcasters.dataSourceStatus +} + +//nolint:revive // DataSystem method. +func (f *FDv2) DataSourceStatusProvider() interfaces.DataSourceStatusProvider { + return f.dataSourceStatusProvider +} + +//nolint:revive // DataSystem method. +func (f *FDv2) DataStoreStatusBroadcaster() *internal.Broadcaster[interfaces.DataStoreStatus] { + return f.broadcasters.dataStoreStatus +} + +//nolint:revive // DataSystem method. +func (f *FDv2) DataStoreStatusProvider() interfaces.DataStoreStatusProvider { + return f.dataStoreStatusProvider +} + +//nolint:revive // DataSystem method. +func (f *FDv2) FlagChangeEventBroadcaster() *internal.Broadcaster[interfaces.FlagChangeEvent] { + return f.broadcasters.flagChangeEvent +} + +//nolint:revive // DataSystem method. +func (f *FDv2) Offline() bool { + return f.disabled +} + +//nolint:revive // DataSourceStatusReporter method. +func (f *FDv2) UpdateStatus(status interfaces.DataSourceState, err interfaces.DataSourceErrorInfo) { + f.mu.Lock() + defer f.mu.Unlock() + f.status = interfaces.DataSourceStatus{ + State: status, + LastError: err, + StateSince: time.Now(), + } +} + +func (f *FDv2) getStatus() interfaces.DataSourceStatus { + f.mu.Lock() + defer f.mu.Unlock() + return f.status +} + +type dataStatusProvider struct { + system *FDv2 +} + +func (d *dataStatusProvider) GetStatus() interfaces.DataSourceStatus { + return d.system.getStatus() +} + +func (d *dataStatusProvider) AddStatusListener() <-chan interfaces.DataSourceStatus { + return d.system.broadcasters.dataSourceStatus.AddListener() +} + +func (d *dataStatusProvider) RemoveStatusListener(listener <-chan interfaces.DataSourceStatus) { + d.system.broadcasters.dataSourceStatus.RemoveListener(listener) +} + +func (d *dataStatusProvider) WaitFor(desiredState interfaces.DataSourceState, timeout time.Duration) bool { + //nolint:godox + // TODO(SDK-930): Implement dataStatusProvider for this data system. + panic("implement me") +} + +var _ interfaces.DataSourceStatusProvider = (*dataStatusProvider)(nil) diff --git a/internal/datasystem/store.go b/internal/datasystem/store.go index 2cb88318..b09615e7 100644 --- a/internal/datasystem/store.go +++ b/internal/datasystem/store.go @@ -68,7 +68,7 @@ type Store struct { active subsystems.ReadOnlyStore // Identifies the current data. - selector *fdv2proto.Selector + selector fdv2proto.Selector mu sync.RWMutex @@ -134,7 +134,7 @@ func (s *Store) WithPersistence(persistent subsystems.DataStore, mode subsystems } // Selector returns the current selector. -func (s *Store) Selector() *fdv2proto.Selector { +func (s *Store) Selector() fdv2proto.Selector { s.mu.RLock() defer s.mu.RUnlock() return s.selector @@ -152,8 +152,12 @@ func (s *Store) Close() error { // SetBasis sets the basis of the store. Any existing data is discarded. To request data persistence, // set persist to true. -func (s *Store) SetBasis(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) { - collections := fdv2proto.ToStorableItems(events) +func (s *Store) SetBasis(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) { + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + s.loggers.Errorf("store: couldn't set basis due to malformed data: %v", err) + return + } s.mu.Lock() defer s.mu.Unlock() @@ -177,8 +181,12 @@ func (s *Store) shouldPersist() bool { // ApplyDelta applies a delta update to the store. ApplyDelta should not be called until SetBasis has been called. // To request data persistence, set persist to true. -func (s *Store) ApplyDelta(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) { - collections := fdv2proto.ToStorableItems(events) +func (s *Store) ApplyDelta(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) { + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + s.loggers.Errorf("store: couldn't apply delta due to malformed data: %v", err) + return + } s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/datasystem/store_test.go b/internal/datasystem/store_test.go index f6701527..7272bc50 100644 --- a/internal/datasystem/store_test.go +++ b/internal/datasystem/store_test.go @@ -1,23 +1,28 @@ package datasystem import ( + "encoding/json" "errors" + "fmt" "math/rand" "sync" "testing" "time" - "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldmodel" + "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldbuilders" + "github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest" "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" - "github.com/launchdarkly/go-server-sdk/v7/subsystems" "github.com/stretchr/testify/require" + "github.com/launchdarkly/go-server-sdk/v7/subsystems" + + "github.com/stretchr/testify/assert" + "github.com/launchdarkly/go-sdk-common/v3/ldlogtest" "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl" "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" - "github.com/stretchr/testify/assert" ) func TestStore_New(t *testing.T) { @@ -45,7 +50,7 @@ func TestStore_NoPersistence_MemoryStore_IsInitialized(t *testing.T) { none := fdv2proto.NoSelector() tests := []struct { name string - selector *fdv2proto.Selector + selector fdv2proto.Selector persist bool }{ {"with selector, persist", v1, true}, @@ -58,12 +63,28 @@ func TestStore_NoPersistence_MemoryStore_IsInitialized(t *testing.T) { logCapture := ldlogtest.NewMockLog() store := NewStore(logCapture.Loggers) defer store.Close() - store.SetBasis([]fdv2proto.Event{}, tt.selector, tt.persist) + store.SetBasis([]fdv2proto.Change{}, tt.selector, tt.persist) assert.True(t, store.IsInitialized()) }) } } +func MustMarshal(model any) json.RawMessage { + data, err := json.Marshal(model) + if err != nil { + panic(err) + } + return data +} + +func MinimalFlag(key string, version int) json.RawMessage { + return []byte(fmt.Sprintf(`{"key":"`+key+`","version": %v}`, version)) +} + +func MinimalSegment(key string, version int) json.RawMessage { + return []byte(fmt.Sprintf(`{"key":"`+key+`","version": %v}`, version)) +} + func TestStore_Commit(t *testing.T) { t.Run("absence of persistent store doesn't cause error when committing", func(t *testing.T) { logCapture := ldlogtest.NewMockLog() @@ -81,24 +102,26 @@ func TestStore_Commit(t *testing.T) { store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeReadWrite, nil) defer store.Close() - // The store receives data as a list of events, but the persistent store receives them as an + // The store receives data as a list of changes, but the persistent store receives them as an // []ldstoretypes.Collection. - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldmodel.FeatureFlag{}}, - fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: ldmodel.Segment{}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: MinimalSegment("bar", 2)}, } + // OK: basically we need to match up the JSON with the FlagBuilder stuff for this to work, a naive marshal won't work. + // The original data system PR has some infra for this. Maybe bring it in in this pr. output := []ldstoretypes.Collection{ { Kind: ldstoreimpl.Features(), Items: []ldstoretypes.KeyedItemDescriptor{ - {Key: "foo", Item: ldstoretypes.ItemDescriptor{Version: 1, Item: ldmodel.FeatureFlag{}}}, + {Key: "foo", Item: sharedtest.FlagDescriptor(ldbuilders.NewFlagBuilder("foo").Version(1).Build())}, }, }, { Kind: ldstoreimpl.Segments(), Items: []ldstoretypes.KeyedItemDescriptor{ - {Key: "bar", Item: ldstoretypes.ItemDescriptor{Version: 2, Item: ldmodel.Segment{}}}, + {Key: "bar", Item: sharedtest.SegmentDescriptor(ldbuilders.NewSegmentBuilder("bar").Version(2).Build())}, }, }} @@ -124,9 +147,9 @@ func TestStore_Commit(t *testing.T) { store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeReadWrite, nil) defer store.Close() - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Object: ldstoretypes.ItemDescriptor{Version: 1}}, - fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Object: ldstoretypes.ItemDescriptor{Version: 2}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: MinimalSegment("bar", 2)}, } store.SetBasis(input, fdv2proto.NoSelector(), false) @@ -148,9 +171,9 @@ func TestStore_Commit(t *testing.T) { store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeRead, nil) defer store.Close() - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Object: ldstoretypes.ItemDescriptor{Version: 1}}, - fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Object: ldstoretypes.ItemDescriptor{Version: 2}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("key", 1)}, + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: MinimalSegment("bar", 2)}, } // Even though persist is true, the store was marked as read-only, so it shouldn't be written to. @@ -174,8 +197,8 @@ func TestStore_GetActive(t *testing.T) { assert.NoError(t, err) assert.Equal(t, foo, ldstoretypes.ItemDescriptor{}.NotFound()) - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldstoretypes.ItemDescriptor{}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, } store.SetBasis(input, fdv2proto.NoSelector(), false) @@ -206,8 +229,8 @@ func TestStore_GetActive(t *testing.T) { _, err := store.Get(ldstoreimpl.Features(), "foo") assert.Equal(t, errImAPersistentStore, err) - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldstoretypes.ItemDescriptor{}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, } store.SetBasis(input, fdv2proto.NoSelector(), false) @@ -230,22 +253,22 @@ func TestStore_SelectorIsRemembered(t *testing.T) { selector4 := fdv2proto.NewSelector("qux", 4) selector5 := fdv2proto.NewSelector("this better be the last one", 5) - store.SetBasis([]fdv2proto.Event{}, selector1, false) + store.SetBasis([]fdv2proto.Change{}, selector1, false) assert.Equal(t, selector1, store.Selector()) - store.SetBasis([]fdv2proto.Event{}, selector2, false) + store.SetBasis([]fdv2proto.Change{}, selector2, false) assert.Equal(t, selector2, store.Selector()) - store.ApplyDelta([]fdv2proto.Event{}, selector3, false) + store.ApplyDelta([]fdv2proto.Change{}, selector3, false) assert.Equal(t, selector3, store.Selector()) - store.ApplyDelta([]fdv2proto.Event{}, selector4, false) + store.ApplyDelta([]fdv2proto.Change{}, selector4, false) assert.Equal(t, selector4, store.Selector()) assert.NoError(t, store.Commit()) assert.Equal(t, selector4, store.Selector()) - store.SetBasis([]fdv2proto.Event{}, selector5, false) + store.SetBasis([]fdv2proto.Change{}, selector5, false) } func TestStore_Concurrency(t *testing.T) { @@ -278,10 +301,10 @@ func TestStore_Concurrency(t *testing.T) { _ = store.IsInitialized() }) go run(func() { - store.SetBasis([]fdv2proto.Event{}, fdv2proto.NoSelector(), true) + store.SetBasis([]fdv2proto.Change{}, fdv2proto.NoSelector(), true) }) go run(func() { - store.ApplyDelta([]fdv2proto.Event{}, fdv2proto.NoSelector(), true) + store.ApplyDelta([]fdv2proto.Change{}, fdv2proto.NoSelector(), true) }) go run(func() { _ = store.Selector() @@ -331,6 +354,7 @@ func (f *fakeStore) Close() error { // This matcher is required instead of calling ElementsMatch directly on two slices of collections because // the order of the collections, or the order within each collection, is not defined. func requireCollectionsMatch(t *testing.T, expected []ldstoretypes.Collection, actual []ldstoretypes.Collection) { + t.Helper() require.Equal(t, len(expected), len(actual)) for _, expectedCollection := range expected { for _, actualCollection := range actual { diff --git a/internal/fdv2proto/changeset.go b/internal/fdv2proto/changeset.go new file mode 100644 index 00000000..a41e548b --- /dev/null +++ b/internal/fdv2proto/changeset.go @@ -0,0 +1,120 @@ +package fdv2proto + +import ( + "encoding/json" + "errors" +) + +// ChangeType specifies if an object is being upserted or deleted. +type ChangeType string + +const ( + // ChangeTypePut represents an object being upserted. + ChangeTypePut = ChangeType("put") + + // ChangeTypeDelete represents an object being deleted. + ChangeTypeDelete = ChangeType("delete") +) + +// Change represents a change to a piece of data, such as an update or deletion. +type Change struct { + Action ChangeType + Kind ObjectKind + Key string + Version int + Object json.RawMessage +} + +// ChangeSet represents a list of changes to be applied. +type ChangeSet struct { + intentCode IntentCode + changes []Change + selector Selector +} + +// IntentCode represents the intent of the changeset. +func (c *ChangeSet) IntentCode() IntentCode { + return c.intentCode +} + +// Changes returns the individual changes that should be applied according +// to the intent. +func (c *ChangeSet) Changes() []Change { + return c.changes +} + +// Selector identifies the version of the changes. +func (c *ChangeSet) Selector() Selector { + return c.selector +} + +// ChangeSetBuilder is a helper for constructing a ChangeSet. +type ChangeSetBuilder struct { + intent *ServerIntent + changes []Change +} + +// NewChangeSetBuilder creates a new ChangeSetBuilder, which is empty by default. +func NewChangeSetBuilder() *ChangeSetBuilder { + return &ChangeSetBuilder{} +} + +// NoChanges represents an intent that the current data is up-to-date and doesn't +// require changes. +func (c *ChangeSetBuilder) NoChanges() *ChangeSet { + return &ChangeSet{ + intentCode: IntentNone, + selector: NoSelector(), + changes: nil, + } +} + +// Start begins a new change set with a given intent. +func (c *ChangeSetBuilder) Start(intent ServerIntent) error { + c.intent = &intent + c.changes = nil + return nil +} + +// Finish identifies a changeset with a selector, and returns the completed changeset. +// It clears any existing changes, while preserving the current intent, so that the builder can be reused. +func (c *ChangeSetBuilder) Finish(selector Selector) (*ChangeSet, error) { + if c.intent == nil { + return nil, errors.New("changeset: cannot complete without a server-intent") + } + changes := &ChangeSet{ + intentCode: c.intent.Payload.Code, + selector: selector, + changes: c.changes, + } + c.changes = nil + if c.intent.Payload.Code == IntentTransferFull { + //nolint:godox + // TODO(SDK-931): We have an awkward situation where we don't get a new intent after receiving a payload + // transferred message, so we need to assume the new intent. But we don't get new Reason/ID/Target, so we don't + // have complete information. + c.intent.Payload.Code = IntentTransferChanges + } + return changes, nil +} + +// AddPut adds a new object to the changeset. +func (c *ChangeSetBuilder) AddPut(kind ObjectKind, key string, version int, object json.RawMessage) { + c.changes = append(c.changes, Change{ + Action: ChangeTypePut, + Kind: kind, + Key: key, + Version: version, + Object: object, + }) +} + +// AddDelete adds a deletion to the changeset. +func (c *ChangeSetBuilder) AddDelete(kind ObjectKind, key string, version int) { + c.changes = append(c.changes, Change{ + Action: ChangeTypeDelete, + Kind: kind, + Key: key, + Version: version, + }) +} diff --git a/internal/fdv2proto/changeset_test.go b/internal/fdv2proto/changeset_test.go new file mode 100644 index 00000000..61576a53 --- /dev/null +++ b/internal/fdv2proto/changeset_test.go @@ -0,0 +1,82 @@ +package fdv2proto + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestChangeSetBuilder_New(t *testing.T) { + builder := NewChangeSetBuilder() + assert.NotNil(t, builder) +} + +func TestChangeSetBuilder_MustStartToFinish(t *testing.T) { + builder := NewChangeSetBuilder() + selector := NewSelector("foo", 1) + _, err := builder.Finish(selector) + assert.Error(t, err) + + assert.NoError(t, builder.Start(ServerIntent{Payload: Payload{Code: IntentNone}})) + + _, err = builder.Finish(selector) + assert.NoError(t, err) +} + +func TestChangeSetBuilder_Changes(t *testing.T) { + builder := NewChangeSetBuilder() + err := builder.Start(ServerIntent{Payload: Payload{Code: IntentTransferChanges}}) + assert.NoError(t, err) + + builder.AddPut("foo", "bar", 1, []byte("baz")) + builder.AddDelete("foo", "bar", 1) + + selector := NewSelector("foo", 1) + changeSet, err := builder.Finish(selector) + assert.NoError(t, err) + assert.NotNil(t, changeSet) + + changes := changeSet.Changes() + assert.Equal(t, 2, len(changes)) + assert.Equal(t, Change{Action: ChangeTypePut, Kind: "foo", Key: "bar", Version: 1, Object: []byte("baz")}, changes[0]) + assert.Equal(t, Change{Action: ChangeTypeDelete, Kind: "foo", Key: "bar", Version: 1}, changes[1]) + + assert.Equal(t, IntentTransferChanges, changeSet.IntentCode()) + assert.Equal(t, selector, changeSet.Selector()) + +} + +// After receiving an intent, the SDK may receive 1 or more objects before receiving a payload-transferred. +// At that point, LaunchDarkly may send more objects followed by another payload-transferred. These objects +// should be regarded as part of an implicit "xfer-changes" intent, even though the server doesn't actually send one. +// If the server intends to use an xfer-full instead (for efficiency or other reasons), it will need to explicitly +// send one. +func TestChangeSetBuilder_ImplicitIntentXferChanges(t *testing.T) { + builder := NewChangeSetBuilder() + err := builder.Start(ServerIntent{Payload: Payload{Code: IntentTransferFull}}) + assert.NoError(t, err) + + changes1, err := builder.Finish(NewSelector("foo", 1)) + assert.NoError(t, err) + assert.Equal(t, IntentTransferFull, changes1.IntentCode()) + + builder.AddPut("foo", "bar", 1, []byte("baz")) + changes2, err := builder.Finish(NewSelector("bar", 2)) + assert.NoError(t, err) + + assert.Equal(t, IntentTransferChanges, changes2.IntentCode()) +} + +func TestChangeSetBuilder_NoChanges(t *testing.T) { + builder := NewChangeSetBuilder() + changeSet := builder.NoChanges() + assert.NotNil(t, changeSet) + + intent := changeSet.IntentCode() + assert.NotNil(t, intent) + + assert.Equal(t, IntentNone, intent) + + assert.False(t, changeSet.Selector().IsDefined()) + assert.Equal(t, NoSelector(), changeSet.Selector()) +} diff --git a/internal/fdv2proto/deltas_to_storable_items.go b/internal/fdv2proto/deltas_to_storable_items.go new file mode 100644 index 00000000..089b96eb --- /dev/null +++ b/internal/fdv2proto/deltas_to_storable_items.go @@ -0,0 +1,59 @@ +package fdv2proto + +import ( + "github.com/launchdarkly/go-jsonstream/v3/jreader" + "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" +) + +// ToStorableItems converts a list of FDv2 events to a list of collections suitable for insertion +// into a data store. +func ToStorableItems(deltas []Change) ([]ldstoretypes.Collection, error) { + collections := make(kindMap) + for _, event := range deltas { + kind, ok := event.Kind.ToFDV1() + if !ok { + // If we don't recognize this kind, it's not an error and should be ignored for forwards + // compatibility. + continue + } + + switch event.Action { + case ChangeTypePut: + // A put requires deserializing the item. We delegate to the optimized streaming JSON + // parser. + reader := jreader.NewReader(event.Object) + item, err := kind.DeserializeFromJSONReader(&reader) + if err != nil { + return nil, err + } + collections[kind] = append(collections[kind], ldstoretypes.KeyedItemDescriptor{ + Key: event.Key, + Item: item, + }) + case ChangeTypeDelete: + // A deletion is represented by a tombstone, which is an ItemDescriptor with a version and nil item. + collections[kind] = append(collections[kind], ldstoretypes.KeyedItemDescriptor{ + Key: event.Key, + Item: ldstoretypes.ItemDescriptor{Version: event.Version, Item: nil}, + }) + default: + // An unknown action isn't an error, and should be ignored for forwards compatibility. + continue + } + } + + return collections.flatten(), nil +} + +type kindMap map[datakinds.DataKindInternal][]ldstoretypes.KeyedItemDescriptor + +func (k kindMap) flatten() (result []ldstoretypes.Collection) { + for kind, items := range k { + result = append(result, ldstoretypes.Collection{ + Kind: kind, + Items: items, + }) + } + return +} diff --git a/internal/fdv2proto/event_to_storable_item.go b/internal/fdv2proto/event_to_storable_item.go deleted file mode 100644 index 379ff12d..00000000 --- a/internal/fdv2proto/event_to_storable_item.go +++ /dev/null @@ -1,59 +0,0 @@ -package fdv2proto - -import ( - "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" -) - -// ToStorableItems converts a list of FDv2 events to a list of collections suitable for insertion -// into a data store. -func ToStorableItems(events []Event) []ldstoretypes.Collection { - flagCollection := ldstoretypes.Collection{ - Kind: datakinds.Features, - Items: make([]ldstoretypes.KeyedItemDescriptor, 0), - } - - segmentCollection := ldstoretypes.Collection{ - Kind: datakinds.Segments, - Items: make([]ldstoretypes.KeyedItemDescriptor, 0), - } - - for _, event := range events { - switch e := event.(type) { - case PutObject: - switch e.Kind { - case FlagKind: - flagCollection.Items = append(flagCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{Version: e.Version, Item: e.Object}, - }) - case SegmentKind: - segmentCollection.Items = append(segmentCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{Version: e.Version, Item: e.Object}, - }) - } - case DeleteObject: - switch e.Kind { - case FlagKind: - flagCollection.Items = append(flagCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{ - Version: e.Version, - Item: nil, - }, - }) - case SegmentKind: - segmentCollection.Items = append(segmentCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{ - Version: e.Version, - Item: nil, - }, - }) - } - } - } - - return []ldstoretypes.Collection{flagCollection, segmentCollection} -} diff --git a/internal/fdv2proto/events.go b/internal/fdv2proto/events.go index a28fc986..2895feb5 100644 --- a/internal/fdv2proto/events.go +++ b/internal/fdv2proto/events.go @@ -1,8 +1,8 @@ package fdv2proto import ( + "encoding/json" "errors" - "fmt" "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" ) @@ -63,56 +63,56 @@ const ( SegmentKind = ObjectKind("segment") ) -// ErrUnknownKind represents that a given ObjectKind had no FDv1 equivalent -// DataKind. -type ErrUnknownKind struct { - kind ObjectKind -} - -// Is returns true if the error is an ErrUnknownKind. -func (e *ErrUnknownKind) Is(err error) bool { - var errUnknownKind *ErrUnknownKind - ok := errors.As(err, &errUnknownKind) - return ok -} - -func (e *ErrUnknownKind) Error() string { - return fmt.Sprintf("unknown object kind: %s", e.kind) -} - // ToFDV1 converts the object kind to an FDv1 data kind. If there is no equivalent, it returns // an ErrUnknownKind. -func (o ObjectKind) ToFDV1() (datakinds.DataKindInternal, error) { +func (o ObjectKind) ToFDV1() (datakinds.DataKindInternal, bool) { switch o { case FlagKind: - return datakinds.Features, nil + return datakinds.Features, true case SegmentKind: - return datakinds.Segments, nil + return datakinds.Segments, true default: - return nil, &ErrUnknownKind{o} + return nil, false } } // ServerIntent represents the server's intent. type ServerIntent struct { - // Payloads is a list of payloads, defined to be at least length 1. - Payloads []Payload `json:"payloads"` + Payload Payload } -//nolint:revive // Event method. -func (ServerIntent) Name() EventName { - return EventServerIntent +// UnmarshalJSON unmarshals a ServerIntent from JSON. The intent is required to have at least +// one payload (at index 0) at this time. +func (s *ServerIntent) UnmarshalJSON(data []byte) error { + // Actual protocol object contains a list of payloads, but currently SDKs only support 1. It is a protocol + // error for this list to be empty. + type serverIntent struct { + Payloads []Payload `json:"payloads"` + } + var intent serverIntent + if err := json.Unmarshal(data, &intent); err != nil { + return err + } + if len(intent.Payloads) == 0 { + return errors.New("changeset: server-intent event has no payloads") + } + s.Payload = intent.Payloads[0] + return nil } -// PayloadTransferred represents the fact that all payload objects have been sent. -type PayloadTransferred struct { - State string `json:"state"` - Version int `json:"version"` +// MarshalJSON marshals a ServerIntent to JSON. The intent object has only one payload but the +// protocol allows for >= 1, so this creates an array. +func (s ServerIntent) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Payloads []Payload `json:"payloads"` + }{ + Payloads: []Payload{s.Payload}, + }) } //nolint:revive // Event method. -func (p PayloadTransferred) Name() EventName { - return EventPayloadTransferred +func (ServerIntent) Name() EventName { + return EventServerIntent } // DeleteObject specifies the deletion of a particular object. @@ -129,10 +129,10 @@ func (d DeleteObject) Name() EventName { // PutObject specifies the addition of a particular object with upsert semantics. type PutObject struct { - Version int `json:"version"` - Kind ObjectKind `json:"kind"` - Key string `json:"key"` - Object any `json:"object"` + Version int `json:"version"` + Kind ObjectKind `json:"kind"` + Key string `json:"key"` + Object json.RawMessage `json:"object"` } //nolint:revive // Event method. diff --git a/internal/fdv2proto/selector.go b/internal/fdv2proto/selector.go index 9e7878fe..5925b708 100644 --- a/internal/fdv2proto/selector.go +++ b/internal/fdv2proto/selector.go @@ -11,29 +11,33 @@ type Selector struct { version int } -// NoSelector returns a nil Selector, representing the lack of one. It is -// here only for readability at call sites. -func NoSelector() *Selector { - return nil +// NoSelector returns an empty Selector. +func NoSelector() Selector { + return Selector{} } -// NewSelector creates a new Selector from a state string and version. -func NewSelector(state string, version int) *Selector { - return &Selector{state: state, version: version} +// IsDefined returns true if the Selector has a value. +func (s Selector) IsDefined() bool { + return s != NoSelector() } -// IsSet returns true if the Selector is not nil. -func (s *Selector) IsSet() bool { - return s != nil +//nolint:revive // Event method. +func (s Selector) Name() EventName { + return EventPayloadTransferred +} + +// NewSelector creates a new Selector from a state string and version. +func NewSelector(state string, version int) Selector { + return Selector{state: state, version: version} } // State returns the state string of the Selector. This cannot be called if the Selector is nil. -func (s *Selector) State() string { +func (s Selector) State() string { return s.state } // Version returns the version of the Selector. This cannot be called if the Selector is nil. -func (s *Selector) Version() int { +func (s Selector) Version() int { return s.version } @@ -56,3 +60,11 @@ func (s *Selector) UnmarshalJSON(data []byte) error { } return nil } + +// MarshalJSON marshals a Selector to JSON. +func (s Selector) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]interface{}{ + "state": s.state, + "version": s.version, + }) +} diff --git a/internal/sharedtest/mocks/mock_data_destination.go b/internal/sharedtest/mocks/mock_data_destination.go index faa01cb6..97bd6537 100644 --- a/internal/sharedtest/mocks/mock_data_destination.go +++ b/internal/sharedtest/mocks/mock_data_destination.go @@ -44,11 +44,14 @@ func NewMockDataDestination(realStore subsystems.DataStore) *MockDataDestination } // SetBasis in this test implementation, delegates to d.DataStore.CapturedUpdates. -func (d *MockDataDestination) SetBasis(events []fdv2proto.Event, _ *fdv2proto.Selector, _ bool) { +func (d *MockDataDestination) SetBasis(events []fdv2proto.Change, _ fdv2proto.Selector, _ bool) { // For now, the selector is ignored. When the data sources start making use of it, it should be // stored so that assertions can be made. - collections := fdv2proto.ToStorableItems(events) + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + panic("MockDataDestination.SetBasis received malformed data: " + err.Error()) + } for _, coll := range collections { AssertNotNil(coll.Kind) @@ -57,11 +60,14 @@ func (d *MockDataDestination) SetBasis(events []fdv2proto.Event, _ *fdv2proto.Se } // ApplyDelta in this test implementation, delegates to d.DataStore.CapturedUpdates. -func (d *MockDataDestination) ApplyDelta(events []fdv2proto.Event, _ *fdv2proto.Selector, _ bool) { +func (d *MockDataDestination) ApplyDelta(events []fdv2proto.Change, _ fdv2proto.Selector, _ bool) { // For now, the selector is ignored. When the data sources start making use of it, it should be // stored so that assertions can be made. - collections := fdv2proto.ToStorableItems(events) + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + panic("MockDataDestination.ApplyDelta received malformed data: " + err.Error()) + } for _, coll := range collections { AssertNotNil(coll.Kind) diff --git a/ldclient.go b/ldclient.go index b40c88b0..fb0926bb 100644 --- a/ldclient.go +++ b/ldclient.go @@ -243,11 +243,19 @@ func MakeCustomClient(sdkKey string, config Config, waitFor time.Duration) (*LDC client.offline = config.Offline - system, err := datasystem.NewFDv1(config.Offline, config.DataStore, config.DataSource, clientContext) - if err != nil { - return nil, err + if config.DataSystem == nil { + system, err := datasystem.NewFDv1(config.Offline, config.DataStore, config.DataSource, clientContext) + if err != nil { + return nil, err + } + client.dataSystem = system + } else { + system, err := datasystem.NewFDv2(config.Offline, config.DataSystem, clientContext) + if err != nil { + return nil, err + } + client.dataSystem = system } - client.dataSystem = system bigSegments := config.BigSegments if bigSegments == nil { diff --git a/ldclient_end_to_end_fdv2_test.go b/ldclient_end_to_end_fdv2_test.go new file mode 100644 index 00000000..74a9c949 --- /dev/null +++ b/ldclient_end_to_end_fdv2_test.go @@ -0,0 +1,275 @@ +package ldclient + +import ( + "crypto/x509" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/launchdarkly/go-sdk-common/v3/ldlog" + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + "github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest" + "github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldservicesv2" + + "github.com/launchdarkly/go-sdk-common/v3/ldlogtest" + "github.com/launchdarkly/go-server-sdk/v7/interfaces" + "github.com/launchdarkly/go-server-sdk/v7/ldcomponents" + "github.com/launchdarkly/go-server-sdk/v7/testhelpers/ldservices" + + "github.com/launchdarkly/go-test-helpers/v3/httphelpers" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// This tests the sequential two-phase nature of the Flag Delivery V2 protocol. First, a polling initializer +// attempts to grab a payload, but the mock handler will return a 500. The initializer will fail and +// the primary synchronizer (streaming mode) will then make a streaming request. This succeeds, returning a payload +// containing a true flag. +func TestFDV2DefaultIsTwoPhaseInit(t *testing.T) { + + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) + + // The polling initializer will fail since we return a 500. + pollRecordingHandler, _ := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(500)) + + protocol := ldservicesv2.NewStreamingProtocol(). + WithIntent(fdv2proto.ServerIntent{Payload: fdv2proto.Payload{ + ID: "fake-id", Target: 0, Code: "xfer-full", Reason: "payload-missing", + }}). + WithPutObjects(data.ToPutObjects()). + WithTransferred(1) + + // The streaming synchronizer will receive the FDv2 protocol messages, including the true flag. + streamHandler, streamSender := ldservices.ServerSideStreamingServiceHandler(protocol.Next()) + protocol.Enqueue(streamSender) + + streamRecordingHandler, _ := httphelpers.RecordingHandler(streamHandler) + + // Use a sequential handler so that the first request is serviced by the polling handler, and the second + // by the streaming. + handler := httphelpers.SequentialHandler(pollRecordingHandler, streamRecordingHandler) + + httphelpers.WithServer(handler, func(server *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + DataSystem: ldcomponents.DataSystem().WithRelayProxyEndpoints(server.URL).Default(), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*5) + require.NoError(t, err) + defer client.Close() + + assert.Equal(t, string(interfaces.DataSourceStateValid), string(client.GetDataSourceStatusProvider().GetStatus().State)) + + value, _ := client.BoolVariation(alwaysTrueFlag.Key, testUser, false) + assert.True(t, value) + + }) +} + +func TestFDV2StreamingSynchronizer(t *testing.T) { + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) + + protocol := ldservicesv2.NewStreamingProtocol(). + WithIntent(fdv2proto.ServerIntent{Payload: fdv2proto.Payload{ + ID: "fake-id", Target: 0, Code: "xfer-full", Reason: "payload-missing", + }}). + WithPutObjects(data.ToPutObjects()). + WithTransferred(1) + + streamHandler, streamSender := ldservices.ServerSideStreamingServiceHandler(protocol.Next()) + protocol.Enqueue(streamSender) + + handler, requestsCh := httphelpers.RecordingHandler(streamHandler) + httphelpers.WithServer(handler, func(streamServer *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + defer logCapture.DumpIfTestFailed(t) + + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + ServiceEndpoints: interfaces.ServiceEndpoints{Streaming: streamServer.URL}, + DataSystem: ldcomponents.DataSystem().WithEndpoints( + ldcomponents.Endpoints{Streaming: streamServer.URL}, + ).Streaming(), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*5) + require.NoError(t, err) + defer client.Close() + + assert.Equal(t, string(interfaces.DataSourceStateValid), string(client.GetDataSourceStatusProvider().GetStatus().State)) + + value, _ := client.BoolVariation(alwaysTrueFlag.Key, testUser, false) + assert.True(t, value) + + r := <-requestsCh + assert.Equal(t, testSdkKey, r.Request.Header.Get("Authorization")) + assertNoMoreRequests(t, requestsCh) + + assert.Len(t, logCapture.GetOutput(ldlog.Error), 0) + assert.Len(t, logCapture.GetOutput(ldlog.Warn), 0) + }) +} + +func TestFDV2StreamingSynchronizeReconnectsWithNonFatalError(t *testing.T) { + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) + + protocol := ldservicesv2.NewStreamingProtocol(). + WithIntent(fdv2proto.ServerIntent{Payload: fdv2proto.Payload{ + ID: "fake-id", Target: 0, Code: "xfer-full", Reason: "payload-missing", + }}). + WithPutObjects(data.ToPutObjects()). + WithTransferred(1) + + streamHandler, streamSender := ldservices.ServerSideStreamingServiceHandler(protocol.Next()) + protocol.Enqueue(streamSender) + + failThenSucceedHandler := httphelpers.SequentialHandler(httphelpers.HandlerWithStatus(503), streamHandler) + handler, requestsCh := httphelpers.RecordingHandler(failThenSucceedHandler) + httphelpers.WithServer(handler, func(streamServer *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + ServiceEndpoints: interfaces.ServiceEndpoints{Streaming: streamServer.URL}, + DataSystem: ldcomponents.DataSystem().WithEndpoints( + ldcomponents.Endpoints{Streaming: streamServer.URL}).Streaming(), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*5) + require.NoError(t, err) + defer client.Close() + + assert.Equal(t, string(interfaces.DataSourceStateValid), string(client.GetDataSourceStatusProvider().GetStatus().State)) + + value, _ := client.BoolVariation(alwaysTrueFlag.Key, testUser, false) + assert.True(t, value) + + r0 := <-requestsCh + assert.Equal(t, testSdkKey, r0.Request.Header.Get("Authorization")) + r1 := <-requestsCh + assert.Equal(t, testSdkKey, r1.Request.Header.Get("Authorization")) + assertNoMoreRequests(t, requestsCh) + + expectedWarning := "Error in stream connection (will retry): HTTP error 503" + assert.Equal(t, []string{expectedWarning}, logCapture.GetOutput(ldlog.Warn)) + assert.Len(t, logCapture.GetOutput(ldlog.Error), 0) + }) +} + +func TestFDV2PollingSynchronizerFailsToStartWith401Error(t *testing.T) { + handler, requestsCh := httphelpers.RecordingHandler(httphelpers.HandlerWithStatus(401)) + httphelpers.WithServer(handler, func(pollServer *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + DataSystem: ldcomponents.DataSystem(). + WithEndpoints(ldcomponents.Endpoints{Polling: pollServer.URL}).Polling(), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*5) + require.Error(t, err) + require.NotNil(t, client) + defer client.Close() + + assert.Equal(t, initializationFailedErrorMessage, err.Error()) + + assert.Equal(t, string(interfaces.DataSourceStateOff), string(client.GetDataSourceStatusProvider().GetStatus().State)) + + value, _ := client.BoolVariation(alwaysTrueFlag.Key, testUser, false) + assert.False(t, value) + + r := <-requestsCh + assert.Equal(t, testSdkKey, r.Request.Header.Get("Authorization")) + assertNoMoreRequests(t, requestsCh) + + expectedError := "Error on polling request (giving up permanently): HTTP error 401 (invalid SDK key)" + assert.Equal(t, []string{expectedError}, logCapture.GetOutput(ldlog.Error)) + assert.Equal(t, []string{pollingModeWarningMessage, initializationFailedErrorMessage}, logCapture.GetOutput(ldlog.Warn)) + }) +} + +func TestFDV2StreamingSynchronizerUsesCustomTLSConfiguration(t *testing.T) { + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) + + protocol := ldservicesv2.NewStreamingProtocol(). + WithIntent(fdv2proto.ServerIntent{Payload: fdv2proto.Payload{ + ID: "fake-id", Target: 0, Code: "xfer-full", Reason: "payload-missing", + }}). + WithPutObjects(data.ToPutObjects()). + WithTransferred(1) + + streamHandler, streamSender := ldservices.ServerSideStreamingServiceHandler(protocol.Next()) + protocol.Enqueue(streamSender) + + httphelpers.WithSelfSignedServer(streamHandler, func(server *httptest.Server, certData []byte, certs *x509.CertPool) { + + config := Config{ + Events: ldcomponents.NoEvents(), + HTTP: ldcomponents.HTTPConfiguration().CACert(certData), + Logging: ldcomponents.Logging().Loggers(sharedtest.NewTestLoggers()), + DataSystem: ldcomponents.DataSystem().WithEndpoints( + ldcomponents.Endpoints{Streaming: server.URL}).Streaming(), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Second*50000) + require.NoError(t, err) + defer client.Close() + + value, _ := client.BoolVariation(alwaysTrueFlag.Key, testUser, false) + assert.True(t, value) + }) +} + +func TestFDV2StreamingSynchronizerTimesOut(t *testing.T) { + data := ldservicesv2.NewServerSDKData().Flags(alwaysTrueFlag) + + protocol := ldservicesv2.NewStreamingProtocol(). + WithIntent(fdv2proto.ServerIntent{Payload: fdv2proto.Payload{ + ID: "fake-id", Target: 0, Code: "xfer-full", Reason: "payload-missing", + }}). + WithPutObjects(data.ToPutObjects()). + WithTransferred(1) + + streamHandler, streamSender := ldservices.ServerSideStreamingServiceHandler(protocol.Next()) + protocol.Enqueue(streamSender) + + slowHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + time.Sleep(300 * time.Millisecond) + streamHandler.ServeHTTP(w, r) + }) + + httphelpers.WithServer(slowHandler, func(streamServer *httptest.Server) { + logCapture := ldlogtest.NewMockLog() + + config := Config{ + Events: ldcomponents.NoEvents(), + Logging: ldcomponents.Logging().Loggers(logCapture.Loggers), + ServiceEndpoints: interfaces.ServiceEndpoints{Streaming: streamServer.URL}, + DataSystem: ldcomponents.DataSystem().WithEndpoints( + ldcomponents.Endpoints{Streaming: streamServer.URL}).Streaming(), + } + + client, err := MakeCustomClient(testSdkKey, config, time.Millisecond*100) + require.Error(t, err) + require.NotNil(t, client) + defer client.Close() + + assert.Equal(t, "timeout encountered waiting for LaunchDarkly client initialization", err.Error()) + + value, _ := client.BoolVariation(alwaysTrueFlag.Key, testUser, false) + assert.False(t, value) + + assert.Equal(t, []string{"Timeout encountered waiting for LaunchDarkly client initialization"}, logCapture.GetOutput(ldlog.Warn)) + assert.Len(t, logCapture.GetOutput(ldlog.Error), 0) + }) +} diff --git a/ldclient_evaluation_all_flags_test.go b/ldclient_evaluation_all_flags_test.go index 4ead0168..9858ae85 100644 --- a/ldclient_evaluation_all_flags_test.go +++ b/ldclient_evaluation_all_flags_test.go @@ -2,9 +2,10 @@ package ldclient import ( "errors" - "github.com/stretchr/testify/require" "testing" + "github.com/stretchr/testify/require" + "github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest/mocks" "github.com/launchdarkly/go-sdk-common/v3/ldlog" diff --git a/ldcomponents/data_system_configuration_builder.go b/ldcomponents/data_system_configuration_builder.go new file mode 100644 index 00000000..695ad20a --- /dev/null +++ b/ldcomponents/data_system_configuration_builder.go @@ -0,0 +1,199 @@ +package ldcomponents + +import ( + "errors" + "fmt" + + ss "github.com/launchdarkly/go-server-sdk/v7/subsystems" +) + +// DataSystemConfigurationBuilder is a builder for configuring the SDK's data acquisition strategy. +type DataSystemConfigurationBuilder struct { + storeBuilder ss.ComponentConfigurer[ss.DataStore] + storeMode ss.DataStoreMode + initializerBuilders []ss.ComponentConfigurer[ss.DataInitializer] + primarySyncBuilder ss.ComponentConfigurer[ss.DataSynchronizer] + secondarySyncBuilder ss.ComponentConfigurer[ss.DataSynchronizer] + config ss.DataSystemConfiguration +} + +// Endpoints represents custom endpoints for LaunchDarkly streaming and polling services. +// +// You may specify none, one, or both of these endpoints via WithEndpoints. If an endpoint isn't specified, +// then the default endpoint for that service will be used. +// +// This is a convenience that is identical to individually configuring polling or streaming synchronizer +// BaseURI's using their specific builder functions. +// +// To specify Relay Proxy endpoints, use WithRelayProxyEndpoints. +type Endpoints struct { + Streaming string + Polling string +} + +// DataSystemModes provides access to high level strategies for fetching data. The default mode +// is suitable for most use-cases. +type DataSystemModes struct { + endpoints Endpoints +} + +// Default is LaunchDarkly's recommended flag data acquisition strategy. Currently, it operates a +// two-phase method for obtaining data: first, it requests data from LaunchDarkly's global CDN. Then, it initiates +// a streaming connection to LaunchDarkly's Flag Delivery services to receive real-time updates. If +// the streaming connection is interrupted for an extended period of time, the SDK will automatically fall back +// to polling the global CDN for updates. +func (d *DataSystemModes) Default() *DataSystemConfigurationBuilder { + streaming := StreamingDataSourceV2() + if d.endpoints.Streaming != "" { + streaming.BaseURI(d.endpoints.Streaming) + } + polling := PollingDataSourceV2() + if d.endpoints.Polling != "" { + polling.BaseURI(d.endpoints.Polling) + } + return d.Custom().Initializers(polling.AsInitializer()).Synchronizers(streaming, polling) +} + +// Streaming configures the SDK to efficiently streams flag/segment data in the background, +// allowing evaluations to operate on the latest data with no additional latency. +func (d *DataSystemModes) Streaming() *DataSystemConfigurationBuilder { + streaming := StreamingDataSourceV2() + if d.endpoints.Streaming != "" { + streaming.BaseURI(d.endpoints.Streaming) + } + return d.Custom().Synchronizers(streaming, nil) +} + +// Polling configures the SDK to regularly poll an endpoint for flag/segment data in the background. +// This is less efficient than streaming, but may be necessary in some network environments. +func (d *DataSystemModes) Polling() *DataSystemConfigurationBuilder { + polling := PollingDataSourceV2() + if d.endpoints.Polling != "" { + polling.BaseURI(d.endpoints.Polling) + } + return d.Custom().Synchronizers(polling, nil) +} + +// Daemon configures the SDK to read from a persistent store integration that is populated by Relay Proxy +// or other SDKs. The SDK will not connect to LaunchDarkly. In this mode, the SDK never writes to the data store. +func (d *DataSystemModes) Daemon(store ss.ComponentConfigurer[ss.DataStore]) *DataSystemConfigurationBuilder { + return d.Custom().DataStore(store, ss.DataStoreModeRead) +} + +// PersistentStore is similar to Default, with the addition of a +// persistent store integration. Before data has arrived from LaunchDarkly, the SDK is able to +// evaluate flags using data from the persistent store. Once fresh data is available, the SDK +// will no longer read from the persistent store, although it will keep it up-to-date. +func (d *DataSystemModes) PersistentStore(store ss.ComponentConfigurer[ss.DataStore]) *DataSystemConfigurationBuilder { + return d.Default().DataStore(store, ss.DataStoreModeReadWrite) +} + +// Custom returns a builder suitable for creating a custom data acquisition strategy. You may configure +// how the SDK uses a Persistent Store, how the SDK obtains an initial set of data, and how the SDK keeps data +// up-to-date. +func (d *DataSystemModes) Custom() *DataSystemConfigurationBuilder { + return &DataSystemConfigurationBuilder{} +} + +// WithEndpoints configures the data system with custom endpoints for LaunchDarkly's streaming +// and polling synchronizers. This method is not necessary for most use-cases, but can be useful for +// testing or custom network configurations. +// +// Any endpoint that is not specified (empty string) will be treated as the default LaunchDarkly SaaS endpoint +// for that service. +func (d *DataSystemModes) WithEndpoints(endpoints Endpoints) *DataSystemModes { + if endpoints.Streaming != "" { + d.endpoints.Streaming = endpoints.Streaming + } + if endpoints.Polling != "" { + d.endpoints.Polling = endpoints.Polling + } + return d +} + +// WithRelayProxyEndpoints configures the data system with a single endpoint for LaunchDarkly's streaming +// and polling synchronizers. The endpoint should be Relay Proxy's base URI, for example http://localhost:8123. +func (d *DataSystemModes) WithRelayProxyEndpoints(baseURI string) *DataSystemModes { + return d.WithEndpoints(Endpoints{Streaming: baseURI, Polling: baseURI}) +} + +// DataSystem provides a high-level selection of the SDK's data acquisition strategy. Use the returned builder to +// select a mode, or to create a custom data acquisition strategy. To use LaunchDarkly's recommended mode, use Default. +func DataSystem() *DataSystemModes { + return &DataSystemModes{endpoints: Endpoints{ + Streaming: DefaultStreamingBaseURI, + Polling: DefaultPollingBaseURI, + }} +} + +// DataStore configures the SDK with an optional data store. The store allows the SDK to serve flag +// values before becoming connected to LaunchDarkly. +func (d *DataSystemConfigurationBuilder) DataStore(store ss.ComponentConfigurer[ss.DataStore], + storeMode ss.DataStoreMode) *DataSystemConfigurationBuilder { + d.storeBuilder = store + d.storeMode = storeMode + return d +} + +// Initializers configures the SDK with one or more DataInitializers, which are responsible for fetching +// complete payloads of flag data. The SDK will run the initializers in the order they are specified, +// stopping when one successfully returns data. +func (d *DataSystemConfigurationBuilder) Initializers( + initializers ...ss.ComponentConfigurer[ss.DataInitializer]) *DataSystemConfigurationBuilder { + d.initializerBuilders = initializers + return d +} + +// Synchronizers configures the SDK with a primary and secondary synchronizer. The primary is responsible +// for keeping the SDK's data up-to-date, and the SDK will fall back to the secondary in case of a +// primary outage. +func (d *DataSystemConfigurationBuilder) Synchronizers(primary, + secondary ss.ComponentConfigurer[ss.DataSynchronizer]) *DataSystemConfigurationBuilder { + d.primarySyncBuilder = primary + d.secondarySyncBuilder = secondary + return d +} + +// Build creates a DataSystemConfiguration from the configuration provided to the builder. +func (d *DataSystemConfigurationBuilder) Build( + context ss.ClientContext, +) (ss.DataSystemConfiguration, error) { + conf := d.config + if d.secondarySyncBuilder != nil && d.primarySyncBuilder == nil { + return ss.DataSystemConfiguration{}, errors.New("cannot have a secondary synchronizer without " + + "a primary synchronizer") + } + if d.storeBuilder != nil { + store, err := d.storeBuilder.Build(context) + if err != nil { + return ss.DataSystemConfiguration{}, err + } + conf.Store = store + } + for i, initializerBuilder := range d.initializerBuilders { + if initializerBuilder == nil { + return ss.DataSystemConfiguration{}, + fmt.Errorf("initializer %d is nil", i) + } + initializer, err := initializerBuilder.Build(context) + if err != nil { + return ss.DataSystemConfiguration{}, err + } + conf.Initializers = append(conf.Initializers, initializer) + } + if d.primarySyncBuilder != nil { + primarySync, err := d.primarySyncBuilder.Build(context) + if err != nil { + return ss.DataSystemConfiguration{}, err + } + conf.Synchronizers.Primary = primarySync + } + if d.secondarySyncBuilder != nil { + secondarySync, err := d.secondarySyncBuilder.Build(context) + if err != nil { + return ss.DataSystemConfiguration{}, err + } + conf.Synchronizers.Secondary = secondarySync + } + return conf, nil +} diff --git a/ldcomponents/polling_data_source_builder_v2.go b/ldcomponents/polling_data_source_builder_v2.go index 2d23a8dc..b466c791 100644 --- a/ldcomponents/polling_data_source_builder_v2.go +++ b/ldcomponents/polling_data_source_builder_v2.go @@ -7,7 +7,6 @@ import ( "github.com/launchdarkly/go-sdk-common/v3/ldvalue" "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" "github.com/launchdarkly/go-server-sdk/v7/internal/datasourcev2" - "github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" "github.com/launchdarkly/go-server-sdk/v7/subsystems" ) @@ -21,6 +20,7 @@ import ( type PollingDataSourceBuilderV2 struct { pollInterval time.Duration filterKey ldvalue.OptionalString + baseURI string } // PollingDataSourceV2 returns a configurable factory for using polling mode to get feature flag data. @@ -38,6 +38,7 @@ type PollingDataSourceBuilderV2 struct { func PollingDataSourceV2() *PollingDataSourceBuilderV2 { return &PollingDataSourceBuilderV2{ pollInterval: DefaultPollInterval, + baseURI: DefaultPollingBaseURI, } } @@ -53,6 +54,12 @@ func (b *PollingDataSourceBuilderV2) PollInterval(pollInterval time.Duration) *P return b } +// BaseURI sets the base URI for the polling connection. +func (b *PollingDataSourceBuilderV2) BaseURI(baseURI string) *PollingDataSourceBuilderV2 { + b.baseURI = baseURI + return b +} + // Used in tests to skip parameter validation. // //nolint:unused // it is used in tests @@ -76,20 +83,15 @@ func (b *PollingDataSourceBuilderV2) PayloadFilter(filterKey string) *PollingDat } // Build is called internally by the SDK. -func (b *PollingDataSourceBuilderV2) Build(context subsystems.ClientContext) (subsystems.DataSource, error) { +func (b *PollingDataSourceBuilderV2) Build(context subsystems.ClientContext) (subsystems.DataSynchronizer, error) { context.GetLogging().Loggers.Warn( "You should only disable the streaming API if instructed to do so by LaunchDarkly support") filterKey, wasSet := b.filterKey.Get() if wasSet && filterKey == "" { return nil, errors.New("payload filter key cannot be an empty string") } - configuredBaseURI := endpoints.SelectBaseURI( - context.GetServiceEndpoints(), - endpoints.PollingService, - context.GetLogging().Loggers, - ) cfg := datasource.PollingConfig{ - BaseURI: configuredBaseURI, + BaseURI: b.baseURI, PollInterval: b.pollInterval, FilterKey: filterKey, } @@ -97,12 +99,18 @@ func (b *PollingDataSourceBuilderV2) Build(context subsystems.ClientContext) (su context.GetDataSourceStatusReporter(), cfg), nil } +// AsInitializer converts the builder into a component configurer for a data initializer. The purpose +// is to allow the PollingDataSourceBuilderV2, which is normally a synchronizer, to be used as an initializer. +func (b *PollingDataSourceBuilderV2) AsInitializer() subsystems.ComponentConfigurer[subsystems.DataInitializer] { + return subsystems.AsInitializer(b) +} + // DescribeConfiguration is used internally by the SDK to inspect the configuration. func (b *PollingDataSourceBuilderV2) DescribeConfiguration(context subsystems.ClientContext) ldvalue.Value { return ldvalue.ObjectBuild(). SetBool("streamingDisabled", true). SetBool("customBaseURI", - endpoints.IsCustom(context.GetServiceEndpoints(), endpoints.PollingService)). + b.baseURI != DefaultPollingBaseURI). Set("pollingIntervalMillis", durationToMillisValue(b.pollInterval)). SetBool("usingRelayDaemon", false). Build() diff --git a/ldcomponents/polling_data_source_builder_v2_test.go b/ldcomponents/polling_data_source_builder_v2_test.go index fe0e686d..cbd07778 100644 --- a/ldcomponents/polling_data_source_builder_v2_test.go +++ b/ldcomponents/polling_data_source_builder_v2_test.go @@ -54,23 +54,28 @@ func TestPollingDataSourceV2Builder(t *testing.T) { assert.Error(t, err) }) }) - t.Run("CreateDefaultDataSource", func(t *testing.T) { - baseURI := "base" + t.Run("DoesNotUseBaseURIFromContext", func(t *testing.T) { + fdv1BaseURI := "base" + // A default FDv2 polling data source configures itself with the default + // polling URI internally - it doesn't look in the Context's ServiceEndpoints for it. This is because + // custom endpoints are injected within the Data System config. p := PollingDataSourceV2() dd := mocks.NewMockDataDestination(datastore.NewInMemoryDataStore(sharedtest.NewTestLoggers())) statusReporter := mocks.NewMockStatusReporter() - clientContext := makeTestContextWithBaseURIs(baseURI) + + clientContext := makeTestContextWithBaseURIs(fdv1BaseURI) clientContext.BasicClientContext.DataDestination = dd clientContext.BasicClientContext.DataSourceStatusReporter = statusReporter + ds, err := p.Build(clientContext) require.NoError(t, err) require.NotNil(t, ds) defer ds.Close() pp := ds.(*datasourcev2.PollingProcessor) - assert.Equal(t, baseURI, pp.GetBaseURI()) + assert.Equal(t, DefaultPollingBaseURI, pp.GetBaseURI()) assert.Equal(t, DefaultPollInterval, pp.GetPollInterval()) }) @@ -79,13 +84,15 @@ func TestPollingDataSourceV2Builder(t *testing.T) { interval := time.Hour filter := "microservice-1" - p := PollingDataSourceV2().PollInterval(interval).PayloadFilter(filter) + p := PollingDataSourceV2().PollInterval(interval).PayloadFilter(filter).BaseURI(baseURI) dd := mocks.NewMockDataDestination(datastore.NewInMemoryDataStore(sharedtest.NewTestLoggers())) statusReporter := mocks.NewMockStatusReporter() + clientContext := makeTestContextWithBaseURIs(baseURI) clientContext.BasicClientContext.DataDestination = dd clientContext.BasicClientContext.DataSourceStatusReporter = statusReporter + ds, err := p.Build(clientContext) require.NoError(t, err) require.NotNil(t, ds) diff --git a/ldcomponents/streaming_data_source_builder_v2.go b/ldcomponents/streaming_data_source_builder_v2.go index a1a88269..b5071ba6 100644 --- a/ldcomponents/streaming_data_source_builder_v2.go +++ b/ldcomponents/streaming_data_source_builder_v2.go @@ -7,7 +7,6 @@ import ( "github.com/launchdarkly/go-sdk-common/v3/ldvalue" "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" "github.com/launchdarkly/go-server-sdk/v7/internal/datasourcev2" - "github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" "github.com/launchdarkly/go-server-sdk/v7/subsystems" ) @@ -21,6 +20,7 @@ import ( type StreamingDataSourceBuilderV2 struct { initialReconnectDelay time.Duration filterKey ldvalue.OptionalString + baseURI string } // StreamingDataSourceV2 returns a configurable factory for using streaming mode to get feature flag data. @@ -36,6 +36,7 @@ type StreamingDataSourceBuilderV2 struct { func StreamingDataSourceV2() *StreamingDataSourceBuilderV2 { return &StreamingDataSourceBuilderV2{ initialReconnectDelay: DefaultInitialReconnectDelay, + baseURI: DefaultStreamingBaseURI, } } @@ -57,6 +58,12 @@ func (b *StreamingDataSourceBuilderV2) InitialReconnectDelay( return b } +// BaseURI sets the base URI for the streaming connection. +func (b *StreamingDataSourceBuilderV2) BaseURI(baseURI string) *StreamingDataSourceBuilderV2 { + b.baseURI = baseURI + return b +} + // PayloadFilter sets the payload filter key for this streaming connection. The filter key // cannot be an empty string. // @@ -71,18 +78,13 @@ func (b *StreamingDataSourceBuilderV2) PayloadFilter(filterKey string) *Streamin } // Build is called internally by the SDK. -func (b *StreamingDataSourceBuilderV2) Build(context subsystems.ClientContext) (subsystems.DataSource, error) { +func (b *StreamingDataSourceBuilderV2) Build(context subsystems.ClientContext) (subsystems.DataSynchronizer, error) { filterKey, wasSet := b.filterKey.Get() if wasSet && filterKey == "" { return nil, errors.New("payload filter key cannot be an empty string") } - configuredBaseURI := endpoints.SelectBaseURI( - context.GetServiceEndpoints(), - endpoints.StreamingService, - context.GetLogging().Loggers, - ) cfg := datasource.StreamConfig{ - URI: configuredBaseURI, + URI: b.baseURI, InitialReconnectDelay: b.initialReconnectDelay, FilterKey: filterKey, } @@ -99,7 +101,7 @@ func (b *StreamingDataSourceBuilderV2) DescribeConfiguration(context subsystems. return ldvalue.ObjectBuild(). SetBool("streamingDisabled", false). SetBool("customStreamURI", - endpoints.IsCustom(context.GetServiceEndpoints(), endpoints.StreamingService)). + b.baseURI != DefaultStreamingBaseURI). Set("reconnectTimeMillis", durationToMillisValue(b.initialReconnectDelay)). SetBool("usingRelayDaemon", false). Build() diff --git a/ldcomponents/streaming_data_source_builder_v2_test.go b/ldcomponents/streaming_data_source_builder_v2_test.go index 7c51ea90..232da28c 100644 --- a/ldcomponents/streaming_data_source_builder_v2_test.go +++ b/ldcomponents/streaming_data_source_builder_v2_test.go @@ -55,37 +55,42 @@ func TestStreamingDataSourceV2Builder(t *testing.T) { }) }) - t.Run("CreateDefaultDataSource", func(t *testing.T) { - baseURI := "base" + t.Run("DoesNotUseBaseURIFromContext", func(t *testing.T) { + fdv1BaseURI := "base" + // A default FDv2 streaming data source configures itself with the default + // streaming URI internally - it doesn't look in the Context's ServiceEndpoints for it. This is because + // custom endpoints are injected within the Data System config. s := StreamingDataSourceV2() dd := mocks.NewMockDataDestination(datastore.NewInMemoryDataStore(sharedtest.NewTestLoggers())) statusReporter := mocks.NewMockStatusReporter() - clientContext := makeTestContextWithBaseURIs(baseURI) + + clientContext := makeTestContextWithBaseURIs(fdv1BaseURI) clientContext.BasicClientContext.DataDestination = dd clientContext.BasicClientContext.DataSourceStatusReporter = statusReporter + ds, err := s.Build(clientContext) require.NoError(t, err) require.NotNil(t, ds) defer ds.Close() sp := ds.(*datasourcev2.StreamProcessor) - assert.Equal(t, baseURI, sp.GetBaseURI()) + assert.Equal(t, DefaultStreamingBaseURI, sp.GetBaseURI()) assert.Equal(t, DefaultInitialReconnectDelay, sp.GetInitialReconnectDelay()) assert.Equal(t, "", sp.GetFilterKey()) }) t.Run("CreateCustomizedDataSource", func(t *testing.T) { - baseURI := "base" + baseURI := "base-uri" delay := time.Hour filter := "microservice-1" - s := StreamingDataSourceV2().InitialReconnectDelay(delay).PayloadFilter(filter) + s := StreamingDataSourceV2().InitialReconnectDelay(delay).PayloadFilter(filter).BaseURI(baseURI) dd := mocks.NewMockDataDestination(datastore.NewInMemoryDataStore(sharedtest.NewTestLoggers())) statusReporter := mocks.NewMockStatusReporter() - clientContext := makeTestContextWithBaseURIs(baseURI) + clientContext := makeTestContextWithBaseURIs("not-used") clientContext.BasicClientContext.DataDestination = dd clientContext.BasicClientContext.DataSourceStatusReporter = statusReporter ds, err := s.Build(clientContext) diff --git a/subsystems/data_destination.go b/subsystems/data_destination.go index b74e3305..1dc29f07 100644 --- a/subsystems/data_destination.go +++ b/subsystems/data_destination.go @@ -19,7 +19,7 @@ type DataDestination interface { // // If persist is true, it indicates that the data should be propagated to any connected persistent // store. - SetBasis(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) + SetBasis(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) // ApplyDelta applies a set of changes to an existing basis. This operation should be atomic with // respect to any other operations that modify the store. @@ -28,5 +28,5 @@ type DataDestination interface { // // If persist is true, it indicates that the changes should be propagated to any connected persistent // store. - ApplyDelta(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) + ApplyDelta(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) } diff --git a/subsystems/data_source.go b/subsystems/data_source.go index eaf6aab5..f2f1bc34 100644 --- a/subsystems/data_source.go +++ b/subsystems/data_source.go @@ -1,6 +1,11 @@ package subsystems -import "io" +import ( + "context" + "io" + + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" +) // DataSource describes the interface for an object that receives feature flag data. type DataSource interface { @@ -18,3 +23,56 @@ type DataSource interface { // initialized for the first time, or determined that initialization cannot ever succeed. Start(closeWhenReady chan<- struct{}) } + +// Basis represents the initial payload of data that a data source can provide. Initializers provide this +// via Fetch, whereas Synchronizers provide it asynchronously via the injected DataDestination. +type Basis struct { + // Events is a series of events representing actions applied to data items. + Events []fdv2proto.Change + // Selector identifies this basis. + Selector fdv2proto.Selector + // Persist is true if the data source requests that the data store persist the items to any connected + // Persistent Stores. + Persist bool +} + +// DataInitializer represents a component capable of obtaining a Basis via a synchronous call. +type DataInitializer interface { + // Name returns the name of the data initializer. + Name() string + // Fetch returns a Basis, or an error if the Basis could not be retrieved. If the context has expired, + // return the context's error. + Fetch(ctx context.Context) (*Basis, error) +} + +// DataSynchronizer represents a component capable of obtaining a Basis and subsequent delta updates asynchronously. +type DataSynchronizer interface { + DataInitializer + // Sync tells the data synchronizer to begin synchronizing data, starting from an optional fdv2proto.Selector. + // The selector may be nil indicating that a full Basis should be fetched. + Sync(closeWhenReady chan<- struct{}, selector fdv2proto.Selector) + // IsInitialized returns true if the data source has successfully initialized at some point. + // + // Once this is true, it should remain true even if a problem occurs later. + IsInitialized() bool + io.Closer +} + +type toInitializer struct { + cc ComponentConfigurer[DataSynchronizer] +} + +func (t toInitializer) Build(context ClientContext) (DataInitializer, error) { + sync, err := t.cc.Build(context) + if err != nil { + return nil, err + } + return sync, nil +} + +// AsInitializer is a helper method that converts a DataSynchronizer to a DataInitializer. This is useful because +// DataSynchronizers are generally also DataInitializers, so it's possible to use one for that purpose if the +// situation calls for it. The primary example is using a Polling synchronizer as an initializer. +func AsInitializer(cc ComponentConfigurer[DataSynchronizer]) ComponentConfigurer[DataInitializer] { + return toInitializer{cc: cc} +} diff --git a/subsystems/datasystem_configuration.go b/subsystems/datasystem_configuration.go new file mode 100644 index 00000000..881a6a92 --- /dev/null +++ b/subsystems/datasystem_configuration.go @@ -0,0 +1,22 @@ +package subsystems + +// SynchronizersConfiguration represents the config for the primary and secondary synchronizers. +type SynchronizersConfiguration struct { + // The synchronizer that is primarily active. + Primary DataSynchronizer + // A fallback synchronizer if the primary fails. + Secondary DataSynchronizer +} + +// DataSystemConfiguration represents the configuration for the data system. +type DataSystemConfiguration struct { + // Store is the (optional) persistent data store. + Store DataStore + // StoreMode specifies the mode in which the persistent store should operate, if present. + StoreMode DataStoreMode + // Initializers obtain data for the SDK in a one-shot manner at startup. Their job is to get the SDK + // into a state where it is serving somewhat fresh values as fast as possible. + Initializers []DataInitializer + // Synchronizers keep the SDK's data up-to-date continuously. + Synchronizers SynchronizersConfiguration +} diff --git a/testhelpers/ldservicesv2/package.go b/testhelpers/ldservicesv2/package.go new file mode 100644 index 00000000..53655b12 --- /dev/null +++ b/testhelpers/ldservicesv2/package.go @@ -0,0 +1,2 @@ +// Package ldservicesv2 provides test helpers for generating fdv2 protocol data. +package ldservicesv2 diff --git a/testhelpers/ldservicesv2/server_sdk_data.go b/testhelpers/ldservicesv2/server_sdk_data.go new file mode 100644 index 00000000..defe26a9 --- /dev/null +++ b/testhelpers/ldservicesv2/server_sdk_data.go @@ -0,0 +1,87 @@ +package ldservicesv2 + +import ( + "encoding/json" + + "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldmodel" + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" +) + +// ServerSDKData is a convenience type for constructing a test server-side SDK data payload for +// PollingServiceHandler or StreamingServiceHandler. Its String() method returns a JSON object with +// the expected "flags" and "segments" properties. +// +// data := NewServerSDKData().Flags(flag1, flag2) +// handler := PollingServiceHandler(data) +type ServerSDKData struct { + FlagsMap map[string]ldmodel.FeatureFlag `json:"flags"` + SegmentsMap map[string]ldmodel.Segment `json:"segments"` +} + +// NewServerSDKData creates a ServerSDKData instance. +func NewServerSDKData() *ServerSDKData { + return &ServerSDKData{ + make(map[string]ldmodel.FeatureFlag), + make(map[string]ldmodel.Segment), + } +} + +// String returns the JSON encoding of the struct as a string. +func (s *ServerSDKData) String() string { + bytes, _ := json.Marshal(*s) + return string(bytes) +} + +// Flags adds the specified items to the struct's "flags" map. +// +// Each item may be either a object produced by KeyAndVersionItem or a real data model object from the ldmodel +// package. The minimum requirement is that when converted to JSON, it has a "key" property. +func (s *ServerSDKData) Flags(flags ...ldmodel.FeatureFlag) *ServerSDKData { + for _, flag := range flags { + s.FlagsMap[flag.Key] = flag + } + return s +} + +// Segments adds the specified items to the struct's "segments" map. +// +// Each item may be either a object produced by KeyAndVersionItem or a real data model object from the ldmodel +// package. The minimum requirement is that when converted to JSON, it has a "key" property. +func (s *ServerSDKData) Segments(segments ...ldmodel.Segment) *ServerSDKData { + for _, segment := range segments { + s.SegmentsMap[segment.Key] = segment + } + return s +} + +func mustMarshal(model any) json.RawMessage { + data, err := json.Marshal(model) + if err != nil { + panic(err) + } + return data +} + +// ToPutObjects converts the data to a list of PutObject objects that can be fed to a mock streaming data source. +func (s *ServerSDKData) ToPutObjects() []fdv2proto.PutObject { + objs := make([]fdv2proto.PutObject, 0, len(s.FlagsMap)+len(s.SegmentsMap)) + for _, flag := range s.FlagsMap { + base := fdv2proto.PutObject{ + Version: flag.Version, + Kind: fdv2proto.FlagKind, + Key: flag.Key, + Object: mustMarshal(flag), + } + objs = append(objs, base) + } + for _, segment := range s.SegmentsMap { + base := fdv2proto.PutObject{ + Version: segment.Version, + Kind: fdv2proto.SegmentKind, + Key: segment.Key, + Object: mustMarshal(segment), + } + objs = append(objs, base) + } + return objs +} diff --git a/testhelpers/ldservicesv2/streaming_protocol_builder.go b/testhelpers/ldservicesv2/streaming_protocol_builder.go new file mode 100644 index 00000000..8fd6366f --- /dev/null +++ b/testhelpers/ldservicesv2/streaming_protocol_builder.go @@ -0,0 +1,83 @@ +package ldservicesv2 + +import ( + "encoding/json" + + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + "github.com/launchdarkly/go-test-helpers/v3/httphelpers" +) + +// ProtocolEvents represents a list of SSE-formatted events. +type ProtocolEvents []httphelpers.SSEEvent + +// Enqueue adds all the events to an SSEStreamController. +func (p ProtocolEvents) Enqueue(control httphelpers.SSEStreamControl) { + for _, msg := range p { + control.Enqueue(msg) + } +} + +// StreamingProtocol is a builder for creating a sequence of events that can be sent as an SSE stream. +type StreamingProtocol struct { + events ProtocolEvents +} + +// NewStreamingProtocol creates a new StreamingProtocol instance. +func NewStreamingProtocol() *StreamingProtocol { + return &StreamingProtocol{} +} + +// WithIntent adds a ServerIntent event to the protocol. +func (f *StreamingProtocol) WithIntent(intent fdv2proto.ServerIntent) *StreamingProtocol { + return f.pushEvent(intent) +} + +// WithPutObject adds a PutObject event to the protocol. +func (f *StreamingProtocol) WithPutObject(object fdv2proto.PutObject) *StreamingProtocol { + return f.pushEvent(object) +} + +// WithTransferred adds a PayloadTransferred event to the protocol with a given version. The state is an arbitrary +// placeholder string; if tests are added that need to verify properties related to the state, this can be +// updated. +func (f *StreamingProtocol) WithTransferred(version int) *StreamingProtocol { + return f.pushEvent(fdv2proto.NewSelector("[p:17YNC7XBH88Y6RDJJ48EKPCJS7:53]", version)) +} + +// WithPutObjects adds multiple PutObject events to the protocol. +func (f *StreamingProtocol) WithPutObjects(objects []fdv2proto.PutObject) *StreamingProtocol { + for _, object := range objects { + f.WithPutObject(object) + } + return f +} + +func (f *StreamingProtocol) pushEvent(data fdv2proto.Event) *StreamingProtocol { + marshalled, err := json.Marshal(data) + if err != nil { + panic(err) + } + f.events = append(f.events, httphelpers.SSEEvent{Event: string(data.Name()), Data: string(marshalled)}) + return f +} + +// HasNext returns true if there are more events in the protocol. +func (f *StreamingProtocol) HasNext() bool { + return len(f.events) != 0 +} + +// Next returns the next event in the protocol, popping the event from protocol's internal queue. +func (f *StreamingProtocol) Next() httphelpers.SSEEvent { + if !f.HasNext() { + panic("protocol has no events") + } + event := f.events[0] + f.events = f.events[1:] + return event +} + +// Enqueue adds all the events to an SSEStreamController. +func (f *StreamingProtocol) Enqueue(control httphelpers.SSEStreamControl) { + f.events.Enqueue(control) + f.events = nil +} diff --git a/testservice/sdk_client_entity.go b/testservice/sdk_client_entity.go index a588bcb0..0430118e 100644 --- a/testservice/sdk_client_entity.go +++ b/testservice/sdk_client_entity.go @@ -26,6 +26,7 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/interfaces/flagstate" "github.com/launchdarkly/go-server-sdk/v7/ldcomponents" "github.com/launchdarkly/go-server-sdk/v7/ldhooks" + "github.com/launchdarkly/go-server-sdk/v7/subsystems" "github.com/launchdarkly/go-server-sdk/v7/testservice/servicedef" "github.com/launchdarkly/go-sdk-common/v3/ldcontext" @@ -368,89 +369,186 @@ func makeSDKConfig(config servicedef.SDKConfigParams, sdkLog ldlog.Loggers) (ld. ret := ld.Config{} ret.Logging = ldcomponents.Logging().Loggers(sdkLog) - if config.ServiceEndpoints != nil { - ret.ServiceEndpoints.Streaming = config.ServiceEndpoints.Streaming - ret.ServiceEndpoints.Polling = config.ServiceEndpoints.Polling - ret.ServiceEndpoints.Events = config.ServiceEndpoints.Events - } + if config.DataSystem != nil { + dataSystemBuilder := ldcomponents.DataSystem().Custom() - if config.Streaming != nil { - if config.Streaming.BaseURI != "" { - ret.ServiceEndpoints.Streaming = config.Streaming.BaseURI - } - builder := ldcomponents.StreamingDataSource() - if config.Streaming.InitialRetryDelayMS != nil { - builder.InitialReconnectDelay(time.Millisecond * time.Duration(*config.Streaming.InitialRetryDelayMS)) - } - if config.Streaming.Filter.IsDefined() { - builder.PayloadFilter(config.Streaming.Filter.String()) - } - ret.DataSource = builder - } else if config.Polling != nil { - if config.Polling.BaseURI != "" { - ret.ServiceEndpoints.Polling = config.Polling.BaseURI - } - builder := ldcomponents.PollingDataSource() - if config.Polling.PollIntervalMS != nil { - builder.PollInterval(time.Millisecond * time.Duration(*config.Polling.PollIntervalMS)) - } - if config.Polling.Filter.IsDefined() { - builder.PayloadFilter(config.Polling.Filter.String()) - } - ret.DataSource = builder - } else if config.ServiceEndpoints == nil { - ret.DataSource = ldcomponents.ExternalUpdatesOnly() - } + if len(config.DataSystem.Initializers) > 0 { + initializers := make([]subsystems.ComponentConfigurer[subsystems.DataInitializer], len(config.DataSystem.Initializers)) + for idx, initializer := range config.DataSystem.Initializers { + if initializer.Polling != nil { + builder := ldcomponents.PollingDataSourceV2() + if initializer.Polling.PollIntervalMS != nil { + builder.PollInterval(time.Millisecond * time.Duration(*initializer.Polling.PollIntervalMS)) + } - if config.PersistentDataStore != nil { - var builder *ldcomponents.PersistentDataStoreBuilder - switch config.PersistentDataStore.Store.Type { - case servicedef.Redis: - dsBuilder := ldredis.DataStore().URL(config.PersistentDataStore.Store.DSN) - if config.PersistentDataStore.Store.Prefix != nil { - dsBuilder.Prefix(*config.PersistentDataStore.Store.Prefix) - } - builder = ldcomponents.PersistentDataStore(dsBuilder) - case servicedef.Consul: - dsBuilder := ldconsul.DataStore().Address(config.PersistentDataStore.Store.DSN) - if config.PersistentDataStore.Store.Prefix != nil { - dsBuilder.Prefix(*config.PersistentDataStore.Store.Prefix) + if initializer.Polling.BaseURI != "" { + builder.BaseURI(initializer.Polling.BaseURI) + } + + if config.DataSystem.PayloadFilter != nil { + builder.PayloadFilter(*config.DataSystem.PayloadFilter) + } + + initializers[idx] = builder.AsInitializer() + } } - builder = ldcomponents.PersistentDataStore(dsBuilder) - case servicedef.DynamoDB: - cfg, err := awsconfig.LoadDefaultConfig(context.Background(), - awsconfig.WithRegion("us-east-1"), - awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "dummy")), - ) + dataSystemBuilder.Initializers(initializers...) + } + + if config.DataSystem.Synchronizers != nil { + primary, err := makeSynchronizerConfig(config.DataSystem.Synchronizers.Primary, config, &ret) if err != nil { return ret, err } - svc := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { - o.EndpointResolver = dynamodb.EndpointResolverFromURL(config.PersistentDataStore.Store.DSN) - }) - - dsBuilder := lddynamodb.DataStore("sdk-contract-tests").DynamoClient(svc) - if config.PersistentDataStore.Store.Prefix != nil { - dsBuilder.Prefix(*config.PersistentDataStore.Store.Prefix) + var secondary subsystems.ComponentConfigurer[subsystems.DataSynchronizer] + if config.DataSystem.Synchronizers.Secondary != nil { + secondary, err = makeSynchronizerConfig(*config.DataSystem.Synchronizers.Secondary, config, &ret) + if err != nil { + return ret, err + } } - builder = ldcomponents.PersistentDataStore(dsBuilder) - default: - return ret, errors.New(fmt.Sprintf("unsupported data store type (%s) requested", config.PersistentDataStore.Store.Type)) + dataSystemBuilder.Synchronizers(primary, secondary) } - if builder != nil { - switch config.PersistentDataStore.Cache.Mode { + if config.DataSystem.Store != nil && config.DataSystem.Store.PersistentDataStore != nil { + var builder *ldcomponents.PersistentDataStoreBuilder + switch config.DataSystem.Store.PersistentDataStore.Store.Type { + case servicedef.Redis: + dsBuilder := ldredis.DataStore().URL(config.DataSystem.Store.PersistentDataStore.Store.DSN) + if config.DataSystem.Store.PersistentDataStore.Store.Prefix != nil { + dsBuilder.Prefix(*config.DataSystem.Store.PersistentDataStore.Store.Prefix) + } + builder = ldcomponents.PersistentDataStore(dsBuilder) + case servicedef.Consul: + dsBuilder := ldconsul.DataStore().Address(config.DataSystem.Store.PersistentDataStore.Store.DSN) + if config.DataSystem.Store.PersistentDataStore.Store.Prefix != nil { + dsBuilder.Prefix(*config.DataSystem.Store.PersistentDataStore.Store.Prefix) + } + builder = ldcomponents.PersistentDataStore(dsBuilder) + case servicedef.DynamoDB: + cfg, err := awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithRegion("us-east-1"), + awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "dummy")), + ) + if err != nil { + return ret, err + } + + svc := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.EndpointResolver = dynamodb.EndpointResolverFromURL(config.DataSystem.Store.PersistentDataStore.Store.DSN) + }) + + dsBuilder := lddynamodb.DataStore("sdk-contract-tests").DynamoClient(svc) + if config.DataSystem.Store.PersistentDataStore.Store.Prefix != nil { + dsBuilder.Prefix(*config.DataSystem.Store.PersistentDataStore.Store.Prefix) + } + + builder = ldcomponents.PersistentDataStore(dsBuilder) + default: + return ret, errors.New(fmt.Sprintf("unsupported data store type (%s) requested", config.DataSystem.Store.PersistentDataStore.Store.Type)) + } + + switch config.DataSystem.Store.PersistentDataStore.Cache.Mode { case servicedef.TTL: - builder.CacheSeconds(*config.PersistentDataStore.Cache.TTL) + builder.CacheSeconds(*config.DataSystem.Store.PersistentDataStore.Cache.TTL) case servicedef.Infinite: builder.CacheForever() case servicedef.Off: builder.NoCaching() } - ret.DataStore = builder + dataSystemBuilder.DataStore(builder, subsystems.DataStoreMode(config.DataSystem.StoreMode)) + } + + sdkLog.Debugf("Data system configuration: %+v", dataSystemBuilder) + ret.DataSystem = dataSystemBuilder + } else { + if config.ServiceEndpoints != nil { + ret.ServiceEndpoints.Streaming = config.ServiceEndpoints.Streaming + ret.ServiceEndpoints.Polling = config.ServiceEndpoints.Polling + ret.ServiceEndpoints.Events = config.ServiceEndpoints.Events + } + + if config.Streaming != nil { + if config.Streaming.BaseURI != "" { + ret.ServiceEndpoints.Streaming = config.Streaming.BaseURI + } + builder := ldcomponents.StreamingDataSource() + if config.Streaming.InitialRetryDelayMS != nil { + builder.InitialReconnectDelay(time.Millisecond * time.Duration(*config.Streaming.InitialRetryDelayMS)) + } + if config.Streaming.Filter.IsDefined() { + builder.PayloadFilter(config.Streaming.Filter.String()) + } + ret.DataSource = builder + } else if config.Polling != nil { + if config.Polling.BaseURI != "" { + ret.ServiceEndpoints.Polling = config.Polling.BaseURI + } + builder := ldcomponents.PollingDataSource() + if config.Polling.PollIntervalMS != nil { + builder.PollInterval(time.Millisecond * time.Duration(*config.Polling.PollIntervalMS)) + } + if config.Polling.Filter.IsDefined() { + builder.PayloadFilter(config.Polling.Filter.String()) + } + ret.DataSource = builder + } else if config.ServiceEndpoints == nil { + ret.DataSource = ldcomponents.ExternalUpdatesOnly() + } + + if config.PersistentDataStore != nil { + var builder *ldcomponents.PersistentDataStoreBuilder + switch config.PersistentDataStore.Store.Type { + case servicedef.Redis: + dsBuilder := ldredis.DataStore().URL(config.PersistentDataStore.Store.DSN) + if config.PersistentDataStore.Store.Prefix != nil { + dsBuilder.Prefix(*config.PersistentDataStore.Store.Prefix) + } + builder = ldcomponents.PersistentDataStore(dsBuilder) + case servicedef.Consul: + dsBuilder := ldconsul.DataStore().Address(config.PersistentDataStore.Store.DSN) + if config.PersistentDataStore.Store.Prefix != nil { + dsBuilder.Prefix(*config.PersistentDataStore.Store.Prefix) + } + builder = ldcomponents.PersistentDataStore(dsBuilder) + case servicedef.DynamoDB: + cfg, err := awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithRegion("us-east-1"), + awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "dummy")), + ) + if err != nil { + return ret, err + } + + svc := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.EndpointResolver = dynamodb.EndpointResolverFromURL(config.PersistentDataStore.Store.DSN) + }) + + dsBuilder := lddynamodb.DataStore("sdk-contract-tests").DynamoClient(svc) + if config.PersistentDataStore.Store.Prefix != nil { + dsBuilder.Prefix(*config.PersistentDataStore.Store.Prefix) + } + + builder = ldcomponents.PersistentDataStore(dsBuilder) + default: + return ret, errors.New(fmt.Sprintf("unsupported data store type (%s) requested", config.PersistentDataStore.Store.Type)) + } + + if builder != nil { + switch config.PersistentDataStore.Cache.Mode { + case servicedef.TTL: + builder.CacheSeconds(*config.PersistentDataStore.Cache.TTL) + case servicedef.Infinite: + builder.CacheForever() + case servicedef.Off: + builder.NoCaching() + } + + ret.DataStore = builder + } } } @@ -522,3 +620,42 @@ func asJSON(value interface{}) string { ret, _ := json.Marshal(value) return string(ret) } + +func makeSynchronizerConfig(synchronizer servicedef.Synchronizer, configParams servicedef.SDKConfigParams, config *ld.Config) (subsystems.ComponentConfigurer[subsystems.DataSynchronizer], error) { + if synchronizer.Polling != nil { + if config.ServiceEndpoints.Polling == "" { + config.ServiceEndpoints.Polling = synchronizer.Polling.BaseURI + } + + builder := ldcomponents.PollingDataSourceV2() + if synchronizer.Polling.PollIntervalMS != nil { + builder.PollInterval(time.Millisecond * time.Duration(*synchronizer.Polling.PollIntervalMS)) + } + if synchronizer.Polling.BaseURI != "" { + builder.BaseURI(synchronizer.Polling.BaseURI) + } + if configParams.DataSystem.PayloadFilter != nil { + builder.PayloadFilter(*configParams.DataSystem.PayloadFilter) + } + + return builder, nil + } else if synchronizer.Streaming != nil { + if config.ServiceEndpoints.Streaming == "" { + config.ServiceEndpoints.Streaming = synchronizer.Streaming.BaseURI + } + + builder := ldcomponents.StreamingDataSourceV2() + if synchronizer.Streaming.InitialRetryDelayMS != nil { + builder.InitialReconnectDelay(time.Millisecond * time.Duration(*synchronizer.Streaming.InitialRetryDelayMS)) + } + if synchronizer.Streaming.BaseURI != "" { + builder.BaseURI(synchronizer.Streaming.BaseURI) + } + if configParams.DataSystem.PayloadFilter != nil { + builder.PayloadFilter(*configParams.DataSystem.PayloadFilter) + } + return builder, nil + } + + return nil, fmt.Errorf("no valid synchronizer configuration found") +} diff --git a/testservice/servicedef/sdk_config.go b/testservice/servicedef/sdk_config.go index 18072fab..19f8ee09 100644 --- a/testservice/servicedef/sdk_config.go +++ b/testservice/servicedef/sdk_config.go @@ -17,6 +17,7 @@ type SDKConfigParams struct { Tags *SDKConfigTagsParams `json:"tags,omitempty"` Hooks *SDKConfigHooksParams `json:"hooks,omitempty"` PersistentDataStore *SDKConfigPersistentDataStoreParams `json:"persistentDataStore,omitempty"` + DataSystem *DataSystem `json:"dataSystem,omitempty"` } type SDKConfigServiceEndpointsParams struct { @@ -25,6 +26,43 @@ type SDKConfigServiceEndpointsParams struct { Events string `json:"events,omitempty"` } +type DataStoreMode int + +const ( + // DataStoreModeRead indicates that the data store is read-only. Data will never be written back to the store by + // the SDK. + DataStoreModeRead = 0 + // DataStoreModeReadWrite indicates that the data store is read-write. Data from initializers/synchronizers may be + // written to the store as necessary. + DataStoreModeReadWrite = 1 +) + +type DataSystem struct { + Store *DataStore `json:"store,omitempty"` + StoreMode DataStoreMode `json:"storeMode"` + Initializers []DataInitializer `json:"initializers"` + Synchronizers *Synchronizers `json:"synchronizers,omitempty"` + PayloadFilter *string `json:"payloadFilter,omitempty"` +} + +type DataStore struct { + PersistentDataStore *SDKConfigPersistentDataStoreParams `json:"persistentDataStore,omitempty"` +} + +type DataInitializer struct { + Polling *SDKConfigPollingParams `json:"polling,omitempty"` +} + +type Synchronizers struct { + Primary Synchronizer `json:"primary"` + Secondary *Synchronizer `json:"secondary,omitempty"` +} + +type Synchronizer struct { + Streaming *SDKConfigStreamingParams `json:"streaming,omitempty"` + Polling *SDKConfigPollingParams `json:"polling,omitempty"` +} + type SDKConfigStreamingParams struct { BaseURI string `json:"baseUri,omitempty"` InitialRetryDelayMS *ldtime.UnixMillisecondTime `json:"initialRetryDelayMs,omitempty"`