diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index ea0a6dcc..d81837ad 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -22,7 +22,7 @@ const ( // PollingRequester allows PollingProcessor to delegate fetching data to another component. // This is useful for testing the PollingProcessor without needing to set up a test HTTP server. type PollingRequester interface { - Request() (*PollingResponse, error) + Request() (*fdv2proto.ChangeSet, error) BaseURI() string FilterKey() string } @@ -86,11 +86,11 @@ func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error) if err != nil { return nil, err } - return &subsystems.Basis{Events: basis.Events(), Selector: basis.Selector(), Persist: true}, nil + return &subsystems.Basis{Events: basis.Changes(), Selector: basis.Selector(), Persist: true}, nil } //nolint:revive // DataSynchronizer method. -func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ *fdv2proto.Selector) { +func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ fdv2proto.Selector) { pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval) ticker := newTickerWithInitialTick(pp.pollInterval) @@ -159,21 +159,22 @@ func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ *fdv2proto.Se } func (pp *PollingProcessor) poll() error { - response, err := pp.requester.Request() + changeSet, err := pp.requester.Request() if err != nil { return err } - if response.Cached() { - return nil - } - - switch response.Intent() { + code := changeSet.IntentCode() + switch code { case fdv2proto.IntentTransferFull: - pp.dataDestination.SetBasis(response.Events(), response.Selector(), true) + pp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true) case fdv2proto.IntentTransferChanges: - pp.dataDestination.ApplyDelta(response.Events(), response.Selector(), true) + pp.dataDestination.ApplyDelta(changeSet.Changes(), changeSet.Selector(), true) + case fdv2proto.IntentNone: + { + // no-op, we are already up-to-date. + } } return nil diff --git a/internal/datasourcev2/polling_http_request.go b/internal/datasourcev2/polling_http_request.go index e6512116..ec09f081 100644 --- a/internal/datasourcev2/polling_http_request.go +++ b/internal/datasourcev2/polling_http_request.go @@ -2,21 +2,17 @@ package datasourcev2 import ( "encoding/json" - "errors" "fmt" "io" "net/http" "net/url" - "github.com/launchdarkly/go-jsonstream/v3/jreader" "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + "github.com/gregjones/httpcache" "github.com/launchdarkly/go-sdk-common/v3/ldlog" "github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" "github.com/launchdarkly/go-server-sdk/v7/subsystems" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" - - "github.com/gregjones/httpcache" "golang.org/x/exp/maps" ) @@ -70,7 +66,7 @@ func (r *pollingRequester) FilterKey() string { return r.filterKey } -func (r *pollingRequester) Request() (*PollingResponse, error) { +func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) { if r.loggers.IsDebugEnabled() { r.loggers.Debug("Polling LaunchDarkly for feature flag updates") } @@ -80,125 +76,49 @@ func (r *pollingRequester) Request() (*PollingResponse, error) { return nil, err } if cached { - return NewCachedPollingResponse(), nil + return fdv2proto.NewChangeSetBuilder().NoChanges(), nil } var payload fdv2proto.PollingPayload - payloadSelector := fdv2proto.NoSelector() if err = json.Unmarshal(body, &payload); err != nil { return nil, malformedJSONError{err} } - parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) { - dataKind, err := kind.ToFDV1() - if err != nil { - return ldstoretypes.ItemDescriptor{}, err - } - item, err := dataKind.DeserializeFromJSONReader(&r) - return item, err - } - - updates := make([]fdv2proto.Event, 0, len(payload.Events)) - - var intentCode fdv2proto.IntentCode + changeSet := fdv2proto.NewChangeSetBuilder() for _, event := range payload.Events { switch event.Name { case fdv2proto.EventServerIntent: - { - var serverIntent fdv2proto.ServerIntent - err := json.Unmarshal(event.Data, &serverIntent) - if err != nil { - return nil, err - } else if len(serverIntent.Payloads) == 0 { - return nil, errors.New("server-intent event has no payloads") - } - - intentCode = serverIntent.Payloads[0].Code - if intentCode == fdv2proto.IntentNone { - return NewCachedPollingResponse(), nil - } + var serverIntent fdv2proto.ServerIntent + err := json.Unmarshal(event.Data, &serverIntent) + if err != nil { + return nil, err + } + if err := changeSet.Start(serverIntent); err != nil { + return nil, err } case fdv2proto.EventPutObject: - { - r := jreader.NewReader(event.Data) - - var ( - key string - kind fdv2proto.ObjectKind - item ldstoretypes.ItemDescriptor - err error - version int - ) - - for obj := r.Object().WithRequiredProperties([]string{ - versionField, kindField, keyField, objectField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - case keyField: - key = r.String() - case objectField: - item, err = parseItem(r, kind) - if err != nil { - return nil, err - } - } - } - updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item.Item, Version: version}) + var put fdv2proto.PutObject + if err := json.Unmarshal(event.Data, &put); err != nil { + return nil, err } + changeSet.AddPut(put.Kind, put.Key, put.Version, put.Object) case fdv2proto.EventDeleteObject: - { - r := jreader.NewReader(event.Data) - - var ( - version int - kind fdv2proto.ObjectKind - key string - ) - - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - //nolint:godox - // TODO: An unrecognized kind should be ignored for forwards compat; the question is, - // do we throw out the DeleteObject here, or let the SDK's store handle it? - case keyField: - key = r.String() - } - } - updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version}) + var deleteObject fdv2proto.DeleteObject + if err := json.Unmarshal(event.Data, &deleteObject); err != nil { + return nil, err } + changeSet.AddDelete(deleteObject.Kind, deleteObject.Key, deleteObject.Version) case fdv2proto.EventPayloadTransferred: - r := jreader.NewReader(event.Data) - - var ( - state string - version int - ) - - for obj := r.Object().WithRequiredProperties([]string{stateField, versionField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case stateField: - state = r.String() - } + var selector fdv2proto.Selector + if err := json.Unmarshal(event.Data, &selector); err != nil { + return nil, err } - payloadSelector = fdv2proto.NewSelector(state, version) + return changeSet.Finish(selector) } } - if intentCode == "" { - return nil, errors.New("no server-intent event found in polling response") - } - - return NewPollingResponse(intentCode, updates, payloadSelector), nil + return nil, fmt.Errorf("malformed protocol") } func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) { diff --git a/internal/datasourcev2/polling_response.go b/internal/datasourcev2/polling_response.go deleted file mode 100644 index 80b2c36f..00000000 --- a/internal/datasourcev2/polling_response.go +++ /dev/null @@ -1,48 +0,0 @@ -package datasourcev2 - -import "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" - -// PollingResponse represents the result of a polling request. -type PollingResponse struct { - events []fdv2proto.Event - cached bool - intent fdv2proto.IntentCode - selector *fdv2proto.Selector -} - -// NewCachedPollingResponse indicates that the response has not changed. -func NewCachedPollingResponse() *PollingResponse { - return &PollingResponse{ - cached: true, - } -} - -// NewPollingResponse indicates that data was received. -func NewPollingResponse(intent fdv2proto.IntentCode, events []fdv2proto.Event, - selector *fdv2proto.Selector) *PollingResponse { - return &PollingResponse{ - events: events, - intent: intent, - selector: selector, - } -} - -// Events returns the events in the response. -func (p *PollingResponse) Events() []fdv2proto.Event { - return p.events -} - -// Cached returns true if the response was cached, meaning data has not changed. -func (p *PollingResponse) Cached() bool { - return p.cached -} - -// Intent returns the server intent code of the response. -func (p *PollingResponse) Intent() fdv2proto.IntentCode { - return p.intent -} - -// Selector returns the Selector of the response. -func (p *PollingResponse) Selector() *fdv2proto.Selector { - return p.selector -} diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index ab21ed8b..4e63c06b 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -12,7 +12,8 @@ import ( "context" - "github.com/launchdarkly/go-jsonstream/v3/jreader" + es "github.com/launchdarkly/eventsource" + "github.com/launchdarkly/go-sdk-common/v3/ldlog" "github.com/launchdarkly/go-sdk-common/v3/ldtime" ldevents "github.com/launchdarkly/go-sdk-events/v3" @@ -21,21 +22,11 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" "github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" "github.com/launchdarkly/go-server-sdk/v7/subsystems" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" - - es "github.com/launchdarkly/eventsource" "golang.org/x/exp/maps" ) const ( - keyField = "key" - kindField = "kind" - versionField = "version" - objectField = "object" - - stateField = "state" - streamReadTimeout = 5 * time.Minute // the LaunchDarkly stream should send a heartbeat comment every 3 minutes streamMaxRetryDelay = 30 * time.Second streamRetryResetInterval = 60 * time.Second @@ -46,11 +37,6 @@ const ( streamingWillRetryMessage = "will retry" ) -type changeSet struct { - intent *fdv2proto.ServerIntent - events []es.Event -} - // Implementation of the streaming data source, not including the lower-level SSE implementation which is in // the eventsource package. // @@ -139,7 +125,7 @@ func (sp *StreamProcessor) IsInitialized() bool { } //nolint:revive // DataSynchronizer method. -func (sp *StreamProcessor) Sync(closeWhenReady chan<- struct{}, selector *fdv2proto.Selector) { +func (sp *StreamProcessor) Sync(closeWhenReady chan<- struct{}, selector fdv2proto.Selector) { sp.loggers.Info("Starting LaunchDarkly streaming connection") go sp.subscribe(closeWhenReady, selector) } @@ -155,9 +141,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< } }() - currentChangeSet := changeSet{ - events: make([]es.Event, 0), - } + changeSetBuilder := fdv2proto.NewChangeSetBuilder() for { select { @@ -180,6 +164,9 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< shouldRestart := false gotMalformedEvent := func(event es.Event, err error) { + // The protocol should "forget" anything that happens upon receiving an error. + changeSetBuilder = fdv2proto.NewChangeSetBuilder() + if event == nil { sp.loggers.Errorf( "Received streaming events with malformed JSON data (%s); will restart stream", @@ -208,29 +195,35 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< case fdv2proto.EventHeartbeat: // Swallow the event and move on. case fdv2proto.EventServerIntent: - //nolint: godox - // TODO: Replace all this json unmarshalling with a nicer jreader implementation. + var serverIntent fdv2proto.ServerIntent err := json.Unmarshal([]byte(event.Data()), &serverIntent) if err != nil { gotMalformedEvent(event, err) break - } else if len(serverIntent.Payloads) == 0 { - gotMalformedEvent(event, errors.New("server-intent event has no payloads")) - break } - if serverIntent.Payloads[0].Code == "none" { - sp.loggers.Info("Server intent is none, skipping") - continue + if err := changeSetBuilder.Start(serverIntent); err != nil { + gotMalformedEvent(event, err) + break } - currentChangeSet = changeSet{events: make([]es.Event, 0), intent: &serverIntent} - case fdv2proto.EventPutObject: - currentChangeSet.events = append(currentChangeSet.events, event) + var p fdv2proto.PutObject + err := json.Unmarshal([]byte(event.Data()), &p) + if err != nil { + gotMalformedEvent(event, err) + break + } + changeSetBuilder.AddPut(p.Kind, p.Key, p.Version, p.Object) case fdv2proto.EventDeleteObject: - currentChangeSet.events = append(currentChangeSet.events, event) + var d fdv2proto.DeleteObject + err := json.Unmarshal([]byte(event.Data()), &d) + if err != nil { + gotMalformedEvent(event, err) + break + } + changeSetBuilder.AddDelete(d.Kind, d.Key, d.Version) case fdv2proto.EventGoodbye: var goodbye fdv2proto.Goodbye err := json.Unmarshal([]byte(event.Data()), &goodbye) @@ -246,51 +239,41 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< var errorData fdv2proto.Error err := json.Unmarshal([]byte(event.Data()), &errorData) if err != nil { - //nolint: godox - // TODO: Confirm that an error means we have to discard - // everything that has come before. - currentChangeSet = changeSet{events: make([]es.Event, 0)} gotMalformedEvent(event, err) break } sp.loggers.Errorf("Error on %s: %s", errorData.PayloadID, errorData.Reason) - currentChangeSet = changeSet{events: make([]es.Event, 0)} - //nolint: godox - // TODO: Do we need to restart here? + // The protocol should "forget" anything that has happened, and expect that we will receive + // more messages in the future (starting with a server intent.) + changeSetBuilder = fdv2proto.NewChangeSetBuilder() case fdv2proto.EventPayloadTransferred: - selector := &fdv2proto.Selector{} - - err := json.Unmarshal([]byte(event.Data()), selector) + var selector fdv2proto.Selector + err := json.Unmarshal([]byte(event.Data()), &selector) if err != nil { gotMalformedEvent(event, err) break } - currentChangeSet.events = append(currentChangeSet.events, event) - updates, err := deserializeEvents(currentChangeSet.events) + // After calling Finish, the builder is ready to receive a new changeset. + changeSet, err := changeSetBuilder.Finish(selector) if err != nil { - sp.loggers.Errorf("Error processing changeset: %s", err) gotMalformedEvent(nil, err) break } - if currentChangeSet.intent == nil { - sp.dataDestination.ApplyDelta(updates, selector, true) - } else { - switch currentChangeSet.intent.Payloads[0].Code { - case fdv2proto.IntentTransferFull: - { - sp.dataDestination.SetBasis(updates, selector, true) - sp.setInitializedAndNotifyClient(true, closeWhenReady) - } - case fdv2proto.IntentTransferChanges: - sp.dataDestination.ApplyDelta(updates, selector, true) - } + code := changeSet.IntentCode() + switch code { + case fdv2proto.IntentTransferFull: + sp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true) + sp.setInitializedAndNotifyClient(true, closeWhenReady) + case fdv2proto.IntentTransferChanges: + sp.dataDestination.ApplyDelta(changeSet.Changes(), changeSet.Selector(), true) + case fdv2proto.IntentNone: + // No changes, our existing data is up-to-date for the moment. } - currentChangeSet = changeSet{events: make([]es.Event, 0)} default: sp.loggers.Infof("Unexpected event found in stream: %s", event.Event()) } @@ -309,9 +292,9 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< } } -func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}, selector *fdv2proto.Selector) { +func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}, selector fdv2proto.Selector) { path := endpoints.AddPath(sp.cfg.URI, endpoints.StreamingRequestPath) - if selector.IsSet() { + if selector.IsDefined() { path = path + "?basis=" + selector.State() } req, reqErr := http.NewRequest("GET", path, nil) @@ -464,77 +447,4 @@ func (sp *StreamProcessor) GetFilterKey() string { return sp.cfg.FilterKey } -func deserializeEvents(events []es.Event) ([]fdv2proto.Event, error) { - updates := make([]fdv2proto.Event, 0, len(events)) - - parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) { - dataKind, err := kind.ToFDV1() - if err != nil { - return ldstoretypes.ItemDescriptor{}, err - } - item, err := dataKind.DeserializeFromJSONReader(&r) - return item, err - } - - for _, event := range events { - switch fdv2proto.EventName(event.Event()) { - case fdv2proto.EventPutObject: - r := jreader.NewReader([]byte(event.Data())) - - var ( - kind fdv2proto.ObjectKind - key string - version int - item ldstoretypes.ItemDescriptor - err error - ) - - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField, objectField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - //nolint:godox - // TODO: An unrecognized kind should be ignored for forwards compat; the question is, - // do we throw out the DeleteObject here, or let the SDK's store handle it? - case keyField: - key = r.String() - case objectField: - item, err = parseItem(r, kind) - if err != nil { - return updates, err - } - } - } - updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item.Item, Version: version}) - case fdv2proto.EventDeleteObject: - r := jreader.NewReader([]byte(event.Data())) - - var ( - version int - kind fdv2proto.ObjectKind - key string - ) - - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - //nolint:godox - // TODO: An unrecognized kind should be ignored for forwards compat; the question is, - // do we throw out the DeleteObject here, or let the SDK's store handle it? - case keyField: - key = r.String() - } - } - updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version}) - } - } - - return updates, nil -} - // vim: foldmethod=marker foldlevel=0 diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index f387e786..0c77e5c7 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -69,8 +69,8 @@ type FDv2 struct { status interfaces.DataSourceStatus } -func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems.DataSystemConfiguration], clientContext *internal.ClientContextImpl) (*FDv2, error) { - +func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems.DataSystemConfiguration], + clientContext *internal.ClientContextImpl) (*FDv2, error) { store := NewStore(clientContext.GetLogging().Loggers) bcasters := &broadcasters{ @@ -180,7 +180,7 @@ func (f *FDv2) runPersistentStoreOutageRecovery(ctx context.Context, statuses <- } } -func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) *fdv2proto.Selector { +func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{}) fdv2proto.Selector { for _, initializer := range f.initializers { f.loggers.Infof("Attempting to initialize via %s", initializer.Name()) basis, err := initializer.Fetch(ctx) @@ -201,7 +201,7 @@ func (f *FDv2) runInitializers(ctx context.Context, closeWhenReady chan struct{} return fdv2proto.NoSelector() } -func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}, selector *fdv2proto.Selector) { +func (f *FDv2) runSynchronizers(ctx context.Context, closeWhenReady chan struct{}, selector fdv2proto.Selector) { // If the SDK was configured with no synchronizer, then (assuming no initializer succeeded), we should // trigger the ready signal to let the call to MakeClient unblock immediately. if f.primarySync == nil { @@ -249,7 +249,7 @@ func (f *FDv2) Store() subsystems.ReadOnlyStore { } func (f *FDv2) DataAvailability() DataAvailability { - if f.store.Selector().IsSet() { + if f.store.Selector().IsDefined() { return Refreshed } if f.store.IsInitialized() { diff --git a/internal/datasystem/store.go b/internal/datasystem/store.go index 2cb88318..b09615e7 100644 --- a/internal/datasystem/store.go +++ b/internal/datasystem/store.go @@ -68,7 +68,7 @@ type Store struct { active subsystems.ReadOnlyStore // Identifies the current data. - selector *fdv2proto.Selector + selector fdv2proto.Selector mu sync.RWMutex @@ -134,7 +134,7 @@ func (s *Store) WithPersistence(persistent subsystems.DataStore, mode subsystems } // Selector returns the current selector. -func (s *Store) Selector() *fdv2proto.Selector { +func (s *Store) Selector() fdv2proto.Selector { s.mu.RLock() defer s.mu.RUnlock() return s.selector @@ -152,8 +152,12 @@ func (s *Store) Close() error { // SetBasis sets the basis of the store. Any existing data is discarded. To request data persistence, // set persist to true. -func (s *Store) SetBasis(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) { - collections := fdv2proto.ToStorableItems(events) +func (s *Store) SetBasis(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) { + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + s.loggers.Errorf("store: couldn't set basis due to malformed data: %v", err) + return + } s.mu.Lock() defer s.mu.Unlock() @@ -177,8 +181,12 @@ func (s *Store) shouldPersist() bool { // ApplyDelta applies a delta update to the store. ApplyDelta should not be called until SetBasis has been called. // To request data persistence, set persist to true. -func (s *Store) ApplyDelta(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) { - collections := fdv2proto.ToStorableItems(events) +func (s *Store) ApplyDelta(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) { + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + s.loggers.Errorf("store: couldn't apply delta due to malformed data: %v", err) + return + } s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/datasystem/store_test.go b/internal/datasystem/store_test.go index f6701527..5d6e76fe 100644 --- a/internal/datasystem/store_test.go +++ b/internal/datasystem/store_test.go @@ -1,13 +1,16 @@ package datasystem import ( + "encoding/json" "errors" + "fmt" "math/rand" "sync" "testing" "time" - "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldmodel" + "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldbuilders" + "github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest" "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" @@ -45,7 +48,7 @@ func TestStore_NoPersistence_MemoryStore_IsInitialized(t *testing.T) { none := fdv2proto.NoSelector() tests := []struct { name string - selector *fdv2proto.Selector + selector fdv2proto.Selector persist bool }{ {"with selector, persist", v1, true}, @@ -58,12 +61,28 @@ func TestStore_NoPersistence_MemoryStore_IsInitialized(t *testing.T) { logCapture := ldlogtest.NewMockLog() store := NewStore(logCapture.Loggers) defer store.Close() - store.SetBasis([]fdv2proto.Event{}, tt.selector, tt.persist) + store.SetBasis([]fdv2proto.Change{}, tt.selector, tt.persist) assert.True(t, store.IsInitialized()) }) } } +func MustMarshal(model any) json.RawMessage { + data, err := json.Marshal(model) + if err != nil { + panic(err) + } + return data +} + +func MinimalFlag(key string, version int) json.RawMessage { + return []byte(fmt.Sprintf(`{"key":"`+key+`","version": %v}`, version)) +} + +func MinimalSegment(key string, version int) json.RawMessage { + return []byte(fmt.Sprintf(`{"key":"`+key+`","version": %v}`, version)) +} + func TestStore_Commit(t *testing.T) { t.Run("absence of persistent store doesn't cause error when committing", func(t *testing.T) { logCapture := ldlogtest.NewMockLog() @@ -81,24 +100,26 @@ func TestStore_Commit(t *testing.T) { store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeReadWrite, nil) defer store.Close() - // The store receives data as a list of events, but the persistent store receives them as an + // The store receives data as a list of changes, but the persistent store receives them as an // []ldstoretypes.Collection. - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldmodel.FeatureFlag{}}, - fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: ldmodel.Segment{}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: MinimalSegment("bar", 2)}, } + // OK: basically we need to match up the JSON with the FlagBuilder stuff for this to work, a naive marshal won't work. + // The original data system PR has some infra for this. Maybe bring it in in this pr. output := []ldstoretypes.Collection{ { Kind: ldstoreimpl.Features(), Items: []ldstoretypes.KeyedItemDescriptor{ - {Key: "foo", Item: ldstoretypes.ItemDescriptor{Version: 1, Item: ldmodel.FeatureFlag{}}}, + {Key: "foo", Item: sharedtest.FlagDescriptor(ldbuilders.NewFlagBuilder("foo").Version(1).Build())}, }, }, { Kind: ldstoreimpl.Segments(), Items: []ldstoretypes.KeyedItemDescriptor{ - {Key: "bar", Item: ldstoretypes.ItemDescriptor{Version: 2, Item: ldmodel.Segment{}}}, + {Key: "bar", Item: sharedtest.SegmentDescriptor(ldbuilders.NewSegmentBuilder("bar").Version(2).Build())}, }, }} @@ -124,9 +145,9 @@ func TestStore_Commit(t *testing.T) { store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeReadWrite, nil) defer store.Close() - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Object: ldstoretypes.ItemDescriptor{Version: 1}}, - fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Object: ldstoretypes.ItemDescriptor{Version: 2}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: MinimalSegment("bar", 2)}, } store.SetBasis(input, fdv2proto.NoSelector(), false) @@ -148,9 +169,9 @@ func TestStore_Commit(t *testing.T) { store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeRead, nil) defer store.Close() - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Object: ldstoretypes.ItemDescriptor{Version: 1}}, - fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Object: ldstoretypes.ItemDescriptor{Version: 2}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("key", 1)}, + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: MinimalSegment("bar", 2)}, } // Even though persist is true, the store was marked as read-only, so it shouldn't be written to. @@ -174,8 +195,8 @@ func TestStore_GetActive(t *testing.T) { assert.NoError(t, err) assert.Equal(t, foo, ldstoretypes.ItemDescriptor{}.NotFound()) - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldstoretypes.ItemDescriptor{}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, } store.SetBasis(input, fdv2proto.NoSelector(), false) @@ -206,8 +227,8 @@ func TestStore_GetActive(t *testing.T) { _, err := store.Get(ldstoreimpl.Features(), "foo") assert.Equal(t, errImAPersistentStore, err) - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldstoretypes.ItemDescriptor{}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, } store.SetBasis(input, fdv2proto.NoSelector(), false) @@ -230,22 +251,22 @@ func TestStore_SelectorIsRemembered(t *testing.T) { selector4 := fdv2proto.NewSelector("qux", 4) selector5 := fdv2proto.NewSelector("this better be the last one", 5) - store.SetBasis([]fdv2proto.Event{}, selector1, false) + store.SetBasis([]fdv2proto.Change{}, selector1, false) assert.Equal(t, selector1, store.Selector()) - store.SetBasis([]fdv2proto.Event{}, selector2, false) + store.SetBasis([]fdv2proto.Change{}, selector2, false) assert.Equal(t, selector2, store.Selector()) - store.ApplyDelta([]fdv2proto.Event{}, selector3, false) + store.ApplyDelta([]fdv2proto.Change{}, selector3, false) assert.Equal(t, selector3, store.Selector()) - store.ApplyDelta([]fdv2proto.Event{}, selector4, false) + store.ApplyDelta([]fdv2proto.Change{}, selector4, false) assert.Equal(t, selector4, store.Selector()) assert.NoError(t, store.Commit()) assert.Equal(t, selector4, store.Selector()) - store.SetBasis([]fdv2proto.Event{}, selector5, false) + store.SetBasis([]fdv2proto.Change{}, selector5, false) } func TestStore_Concurrency(t *testing.T) { @@ -278,10 +299,10 @@ func TestStore_Concurrency(t *testing.T) { _ = store.IsInitialized() }) go run(func() { - store.SetBasis([]fdv2proto.Event{}, fdv2proto.NoSelector(), true) + store.SetBasis([]fdv2proto.Change{}, fdv2proto.NoSelector(), true) }) go run(func() { - store.ApplyDelta([]fdv2proto.Event{}, fdv2proto.NoSelector(), true) + store.ApplyDelta([]fdv2proto.Change{}, fdv2proto.NoSelector(), true) }) go run(func() { _ = store.Selector() @@ -331,6 +352,7 @@ func (f *fakeStore) Close() error { // This matcher is required instead of calling ElementsMatch directly on two slices of collections because // the order of the collections, or the order within each collection, is not defined. func requireCollectionsMatch(t *testing.T, expected []ldstoretypes.Collection, actual []ldstoretypes.Collection) { + t.Helper() require.Equal(t, len(expected), len(actual)) for _, expectedCollection := range expected { for _, actualCollection := range actual { diff --git a/internal/fdv2proto/changeset.go b/internal/fdv2proto/changeset.go new file mode 100644 index 00000000..40afd5ea --- /dev/null +++ b/internal/fdv2proto/changeset.go @@ -0,0 +1,119 @@ +package fdv2proto + +import ( + "encoding/json" + "errors" +) + +// ChangeType specifies if an object is being upserted or deleted. +type ChangeType string + +const ( + // ChangeTypePut represents an object being upserted. + ChangeTypePut = ChangeType("put") + + // ChangeTypeDelete represents an object being deleted. + ChangeTypeDelete = ChangeType("delete") +) + +// Change represents a change to a piece of data, such as an update or deletion. +type Change struct { + Action ChangeType + Kind ObjectKind + Key string + Version int + Object json.RawMessage +} + +// ChangeSet represents a list of changes to be applied. +type ChangeSet struct { + intentCode IntentCode + changes []Change + selector Selector +} + +// IntentCode represents the intent of the changeset. +func (c *ChangeSet) IntentCode() IntentCode { + return c.intentCode +} + +// Changes returns the individual changes that should be applied according +// to the intent. +func (c *ChangeSet) Changes() []Change { + return c.changes +} + +// Selector identifies the version of the changes. +func (c *ChangeSet) Selector() Selector { + return c.selector +} + +// ChangeSetBuilder is a helper for constructing a ChangeSet. +type ChangeSetBuilder struct { + intent *ServerIntent + changes []Change +} + +// NewChangeSetBuilder creates a new ChangeSetBuilder, which is empty by default. +func NewChangeSetBuilder() *ChangeSetBuilder { + return &ChangeSetBuilder{} +} + +// NoChanges represents an intent that the current data is up-to-date and doesn't +// require changes. +func (c *ChangeSetBuilder) NoChanges() *ChangeSet { + return &ChangeSet{ + intentCode: IntentNone, + selector: NoSelector(), + changes: nil, + } +} + +// Start begins a new change set with a given intent. +func (c *ChangeSetBuilder) Start(intent ServerIntent) error { + c.intent = &intent + c.changes = nil + return nil +} + +// Finish identifies a changeset with a selector, and returns the completed changeset. +// It clears any existing changes, while preserving the current intent, so that the builder can be reused. +func (c *ChangeSetBuilder) Finish(selector Selector) (*ChangeSet, error) { + if c.intent == nil { + return nil, errors.New("changeset: cannot complete without a server-intent") + } + changes := &ChangeSet{ + intentCode: c.intent.Payload.Code, + selector: selector, + changes: c.changes, + } + c.changes = nil + if c.intent.Payload.Code == IntentTransferFull { + // TODO(SDK-931): We have an awkward situation where we don't get a new intent after receiving a payload + // transferred message, so we need to assume the new intent. But we don't get new Reason/ID/Target, so we don't + // have complete information. + c.intent.Payload.Code = IntentTransferChanges + } + return changes, nil +} + +// AddPut adds a new object to the changeset. +func (c *ChangeSetBuilder) AddPut(kind ObjectKind, key string, version int, object json.RawMessage) { + c.changes = append(c.changes, Change{ + Action: ChangeTypePut, + Kind: kind, + Key: key, + Version: version, + Object: object, + }) +} + +// AddDelete adds a deletion to the changeset. +func (c *ChangeSetBuilder) AddDelete(kind ObjectKind, key string, version int) { + c.changes = append(c.changes, Change{ + Action: ChangeTypeDelete, + Kind: kind, + Key: key, + Version: version, + }) +} diff --git a/internal/fdv2proto/changeset_test.go b/internal/fdv2proto/changeset_test.go new file mode 100644 index 00000000..19247c8b --- /dev/null +++ b/internal/fdv2proto/changeset_test.go @@ -0,0 +1,70 @@ +package fdv2proto + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestChangeSetBuilder_New(t *testing.T) { + builder := NewChangeSetBuilder() + assert.NotNil(t, builder) +} + +func TestChangeSetBuilder_MustStartToFinish(t *testing.T) { + builder := NewChangeSetBuilder() + selector := NewSelector("foo", 1) + _, err := builder.Finish(selector) + assert.Error(t, err) + + assert.NoError(t, builder.Start(ServerIntent{Payload: Payload{Code: IntentNone}})) + + _, err = builder.Finish(selector) + assert.NoError(t, err) +} + +func TestChangeSetBuilder_Changes(t *testing.T) { + builder := NewChangeSetBuilder() + err := builder.Start(ServerIntent{Payload: Payload{Code: IntentTransferChanges}}) + assert.NoError(t, err) + + builder.AddPut("foo", "bar", 1, []byte("baz")) + builder.AddDelete("foo", "bar", 1) + + selector := NewSelector("foo", 1) + changeSet, err := builder.Finish(selector) + assert.NoError(t, err) + assert.NotNil(t, changeSet) + + changes := changeSet.Changes() + assert.Equal(t, 2, len(changes)) + assert.Equal(t, Change{Action: ChangeTypePut, Kind: "foo", Key: "bar", Version: 1, Object: []byte("baz")}, changes[0]) + assert.Equal(t, Change{Action: ChangeTypeDelete, Kind: "foo", Key: "bar", Version: 1}, changes[1]) + + assert.Equal(t, IntentTransferChanges, changeSet.IntentCode().Payload.Code) + assert.Equal(t, selector, changeSet.Selector()) + +} + +// After receiving an intent, the SDK may receive 1 or more objects before receiving a payload-transferred. +// At that point, LaunchDarkly may send more objects followed by another payload-transferred. These objects +// should be regarded as part of an implicit "xfer-changes" intent, even though the server doesn't actually send one. +// If the server intends to use an xfer-full instead (for efficiency or other reasons), it will need to explicitly +// send one. +func TestChangeSetBuilder_ImplicitIntentXferChanges(t *testing.T) { + +} + +func TestChangeSetBuilder_NoChanges(t *testing.T) { + builder := NewChangeSetBuilder() + changeSet := builder.NoChanges() + assert.NotNil(t, changeSet) + + intent := changeSet.IntentCode() + assert.NotNil(t, intent) + + assert.Equal(t, IntentNone, intent.Payload.Code) + + assert.False(t, changeSet.Selector().IsDefined()) + assert.Equal(t, NoSelector(), changeSet.Selector()) +} diff --git a/internal/fdv2proto/deltas_to_storable_items.go b/internal/fdv2proto/deltas_to_storable_items.go new file mode 100644 index 00000000..089b96eb --- /dev/null +++ b/internal/fdv2proto/deltas_to_storable_items.go @@ -0,0 +1,59 @@ +package fdv2proto + +import ( + "github.com/launchdarkly/go-jsonstream/v3/jreader" + "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" +) + +// ToStorableItems converts a list of FDv2 events to a list of collections suitable for insertion +// into a data store. +func ToStorableItems(deltas []Change) ([]ldstoretypes.Collection, error) { + collections := make(kindMap) + for _, event := range deltas { + kind, ok := event.Kind.ToFDV1() + if !ok { + // If we don't recognize this kind, it's not an error and should be ignored for forwards + // compatibility. + continue + } + + switch event.Action { + case ChangeTypePut: + // A put requires deserializing the item. We delegate to the optimized streaming JSON + // parser. + reader := jreader.NewReader(event.Object) + item, err := kind.DeserializeFromJSONReader(&reader) + if err != nil { + return nil, err + } + collections[kind] = append(collections[kind], ldstoretypes.KeyedItemDescriptor{ + Key: event.Key, + Item: item, + }) + case ChangeTypeDelete: + // A deletion is represented by a tombstone, which is an ItemDescriptor with a version and nil item. + collections[kind] = append(collections[kind], ldstoretypes.KeyedItemDescriptor{ + Key: event.Key, + Item: ldstoretypes.ItemDescriptor{Version: event.Version, Item: nil}, + }) + default: + // An unknown action isn't an error, and should be ignored for forwards compatibility. + continue + } + } + + return collections.flatten(), nil +} + +type kindMap map[datakinds.DataKindInternal][]ldstoretypes.KeyedItemDescriptor + +func (k kindMap) flatten() (result []ldstoretypes.Collection) { + for kind, items := range k { + result = append(result, ldstoretypes.Collection{ + Kind: kind, + Items: items, + }) + } + return +} diff --git a/internal/fdv2proto/event_to_storable_item.go b/internal/fdv2proto/event_to_storable_item.go deleted file mode 100644 index 379ff12d..00000000 --- a/internal/fdv2proto/event_to_storable_item.go +++ /dev/null @@ -1,59 +0,0 @@ -package fdv2proto - -import ( - "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" -) - -// ToStorableItems converts a list of FDv2 events to a list of collections suitable for insertion -// into a data store. -func ToStorableItems(events []Event) []ldstoretypes.Collection { - flagCollection := ldstoretypes.Collection{ - Kind: datakinds.Features, - Items: make([]ldstoretypes.KeyedItemDescriptor, 0), - } - - segmentCollection := ldstoretypes.Collection{ - Kind: datakinds.Segments, - Items: make([]ldstoretypes.KeyedItemDescriptor, 0), - } - - for _, event := range events { - switch e := event.(type) { - case PutObject: - switch e.Kind { - case FlagKind: - flagCollection.Items = append(flagCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{Version: e.Version, Item: e.Object}, - }) - case SegmentKind: - segmentCollection.Items = append(segmentCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{Version: e.Version, Item: e.Object}, - }) - } - case DeleteObject: - switch e.Kind { - case FlagKind: - flagCollection.Items = append(flagCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{ - Version: e.Version, - Item: nil, - }, - }) - case SegmentKind: - segmentCollection.Items = append(segmentCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{ - Version: e.Version, - Item: nil, - }, - }) - } - } - } - - return []ldstoretypes.Collection{flagCollection, segmentCollection} -} diff --git a/internal/fdv2proto/events.go b/internal/fdv2proto/events.go index a28fc986..8d4e76d3 100644 --- a/internal/fdv2proto/events.go +++ b/internal/fdv2proto/events.go @@ -1,8 +1,8 @@ package fdv2proto import ( + "encoding/json" "errors" - "fmt" "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" ) @@ -63,56 +63,46 @@ const ( SegmentKind = ObjectKind("segment") ) -// ErrUnknownKind represents that a given ObjectKind had no FDv1 equivalent -// DataKind. -type ErrUnknownKind struct { - kind ObjectKind -} - -// Is returns true if the error is an ErrUnknownKind. -func (e *ErrUnknownKind) Is(err error) bool { - var errUnknownKind *ErrUnknownKind - ok := errors.As(err, &errUnknownKind) - return ok -} - -func (e *ErrUnknownKind) Error() string { - return fmt.Sprintf("unknown object kind: %s", e.kind) -} - // ToFDV1 converts the object kind to an FDv1 data kind. If there is no equivalent, it returns // an ErrUnknownKind. -func (o ObjectKind) ToFDV1() (datakinds.DataKindInternal, error) { +func (o ObjectKind) ToFDV1() (datakinds.DataKindInternal, bool) { switch o { case FlagKind: - return datakinds.Features, nil + return datakinds.Features, true case SegmentKind: - return datakinds.Segments, nil + return datakinds.Segments, true default: - return nil, &ErrUnknownKind{o} + return nil, false } } // ServerIntent represents the server's intent. type ServerIntent struct { - // Payloads is a list of payloads, defined to be at least length 1. - Payloads []Payload `json:"payloads"` -} - -//nolint:revive // Event method. -func (ServerIntent) Name() EventName { - return EventServerIntent + Payload Payload } -// PayloadTransferred represents the fact that all payload objects have been sent. -type PayloadTransferred struct { - State string `json:"state"` - Version int `json:"version"` +// UnmarshalJSON unmarshals a ServerIntent from JSON. The intent is required to have at least +// one payload (at index 0) at this time. +func (s *ServerIntent) UnmarshalJSON(data []byte) error { + // Actual protocol object contains a list of payloads, but currently SDKs only support 1. It is a protocol + // error for this list to be empty. + type serverIntent struct { + Payloads []Payload `json:"payloads"` + } + var intent serverIntent + if err := json.Unmarshal(data, &intent); err != nil { + return err + } + if len(intent.Payloads) == 0 { + return errors.New("changeset: server-intent event has no payloads") + } + s.Payload = intent.Payloads[0] + return nil } //nolint:revive // Event method. -func (p PayloadTransferred) Name() EventName { - return EventPayloadTransferred +func (ServerIntent) Name() EventName { + return EventServerIntent } // DeleteObject specifies the deletion of a particular object. @@ -122,6 +112,10 @@ type DeleteObject struct { Key string `json:"key"` } +func (d DeleteObject) Delta() json.RawMessage { + return nil +} + //nolint:revive // Event method. func (d DeleteObject) Name() EventName { return EventDeleteObject @@ -129,10 +123,10 @@ func (d DeleteObject) Name() EventName { // PutObject specifies the addition of a particular object with upsert semantics. type PutObject struct { - Version int `json:"version"` - Kind ObjectKind `json:"kind"` - Key string `json:"key"` - Object any `json:"object"` + Version int `json:"version"` + Kind ObjectKind `json:"kind"` + Key string `json:"key"` + Object json.RawMessage `json:"object"` } //nolint:revive // Event method. diff --git a/internal/fdv2proto/selector.go b/internal/fdv2proto/selector.go index 9e7878fe..0ba5809b 100644 --- a/internal/fdv2proto/selector.go +++ b/internal/fdv2proto/selector.go @@ -11,29 +11,33 @@ type Selector struct { version int } -// NoSelector returns a nil Selector, representing the lack of one. It is -// here only for readability at call sites. -func NoSelector() *Selector { - return nil +// NoSelector returns an empty Selector. +func NoSelector() Selector { + return Selector{} } -// NewSelector creates a new Selector from a state string and version. -func NewSelector(state string, version int) *Selector { - return &Selector{state: state, version: version} +// IsDefined returns true if the Selector has a value. +func (s Selector) IsDefined() bool { + return s != NoSelector() } -// IsSet returns true if the Selector is not nil. -func (s *Selector) IsSet() bool { - return s != nil +// Name returns the name of the event. +func (s Selector) Name() EventName { + return EventPayloadTransferred +} + +// NewSelector creates a new Selector from a state string and version. +func NewSelector(state string, version int) Selector { + return Selector{state: state, version: version} } // State returns the state string of the Selector. This cannot be called if the Selector is nil. -func (s *Selector) State() string { +func (s Selector) State() string { return s.state } // Version returns the version of the Selector. This cannot be called if the Selector is nil. -func (s *Selector) Version() int { +func (s Selector) Version() int { return s.version } diff --git a/internal/sharedtest/mocks/mock_data_destination.go b/internal/sharedtest/mocks/mock_data_destination.go index faa01cb6..97bd6537 100644 --- a/internal/sharedtest/mocks/mock_data_destination.go +++ b/internal/sharedtest/mocks/mock_data_destination.go @@ -44,11 +44,14 @@ func NewMockDataDestination(realStore subsystems.DataStore) *MockDataDestination } // SetBasis in this test implementation, delegates to d.DataStore.CapturedUpdates. -func (d *MockDataDestination) SetBasis(events []fdv2proto.Event, _ *fdv2proto.Selector, _ bool) { +func (d *MockDataDestination) SetBasis(events []fdv2proto.Change, _ fdv2proto.Selector, _ bool) { // For now, the selector is ignored. When the data sources start making use of it, it should be // stored so that assertions can be made. - collections := fdv2proto.ToStorableItems(events) + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + panic("MockDataDestination.SetBasis received malformed data: " + err.Error()) + } for _, coll := range collections { AssertNotNil(coll.Kind) @@ -57,11 +60,14 @@ func (d *MockDataDestination) SetBasis(events []fdv2proto.Event, _ *fdv2proto.Se } // ApplyDelta in this test implementation, delegates to d.DataStore.CapturedUpdates. -func (d *MockDataDestination) ApplyDelta(events []fdv2proto.Event, _ *fdv2proto.Selector, _ bool) { +func (d *MockDataDestination) ApplyDelta(events []fdv2proto.Change, _ fdv2proto.Selector, _ bool) { // For now, the selector is ignored. When the data sources start making use of it, it should be // stored so that assertions can be made. - collections := fdv2proto.ToStorableItems(events) + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + panic("MockDataDestination.ApplyDelta received malformed data: " + err.Error()) + } for _, coll := range collections { AssertNotNil(coll.Kind) diff --git a/ldclient_evaluation_all_flags_test.go b/ldclient_evaluation_all_flags_test.go index 4ead0168..9858ae85 100644 --- a/ldclient_evaluation_all_flags_test.go +++ b/ldclient_evaluation_all_flags_test.go @@ -2,9 +2,10 @@ package ldclient import ( "errors" - "github.com/stretchr/testify/require" "testing" + "github.com/stretchr/testify/require" + "github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest/mocks" "github.com/launchdarkly/go-sdk-common/v3/ldlog" diff --git a/ldcomponents/data_system_configuration_builder.go b/ldcomponents/data_system_configuration_builder.go index e45d7117..63c40626 100644 --- a/ldcomponents/data_system_configuration_builder.go +++ b/ldcomponents/data_system_configuration_builder.go @@ -16,6 +16,8 @@ type DataSystemConfigurationBuilder struct { config ss.DataSystemConfiguration } +// DataSystemModes provides access to high level strategies for fetching data. The default mode +// is suitable for most use-cases. type DataSystemModes struct{} // Default is LaunchDarkly's recommended flag data acquisition strategy. Currently, it operates a diff --git a/subsystems/data_destination.go b/subsystems/data_destination.go index b74e3305..1dc29f07 100644 --- a/subsystems/data_destination.go +++ b/subsystems/data_destination.go @@ -19,7 +19,7 @@ type DataDestination interface { // // If persist is true, it indicates that the data should be propagated to any connected persistent // store. - SetBasis(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) + SetBasis(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) // ApplyDelta applies a set of changes to an existing basis. This operation should be atomic with // respect to any other operations that modify the store. @@ -28,5 +28,5 @@ type DataDestination interface { // // If persist is true, it indicates that the changes should be propagated to any connected persistent // store. - ApplyDelta(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) + ApplyDelta(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) } diff --git a/subsystems/data_source.go b/subsystems/data_source.go index c199bfd1..f2f1bc34 100644 --- a/subsystems/data_source.go +++ b/subsystems/data_source.go @@ -28,9 +28,9 @@ type DataSource interface { // via Fetch, whereas Synchronizers provide it asynchronously via the injected DataDestination. type Basis struct { // Events is a series of events representing actions applied to data items. - Events []fdv2proto.Event + Events []fdv2proto.Change // Selector identifies this basis. - Selector *fdv2proto.Selector + Selector fdv2proto.Selector // Persist is true if the data source requests that the data store persist the items to any connected // Persistent Stores. Persist bool @@ -50,7 +50,7 @@ type DataSynchronizer interface { DataInitializer // Sync tells the data synchronizer to begin synchronizing data, starting from an optional fdv2proto.Selector. // The selector may be nil indicating that a full Basis should be fetched. - Sync(closeWhenReady chan<- struct{}, selector *fdv2proto.Selector) + Sync(closeWhenReady chan<- struct{}, selector fdv2proto.Selector) // IsInitialized returns true if the data source has successfully initialized at some point. // // Once this is true, it should remain true even if a problem occurs later. diff --git a/subsystems/ldstoreimpl/big_segment_store_wrapper.go b/subsystems/ldstoreimpl/big_segment_store_wrapper.go index 0ba9f567..2502be96 100644 --- a/subsystems/ldstoreimpl/big_segment_store_wrapper.go +++ b/subsystems/ldstoreimpl/big_segment_store_wrapper.go @@ -53,7 +53,7 @@ type BigSegmentStoreWrapper struct { // is true. // // The BigSegmentStoreWrapper takes ownership of the BigSegmentStore's lifecycle at this point, so -// calling Close on the BigSegmentStoreWrapper will also close the store. +// calling Finish on the BigSegmentStoreWrapper will also close the store. // // If not nil, statusUpdateFn will be called whenever the store status has changed. func NewBigSegmentStoreWrapperWithConfig( diff --git a/testhelpers/ldservices/streaming_service.go b/testhelpers/ldservices/streaming_service.go index 7e1d7e36..81e09cf5 100644 --- a/testhelpers/ldservices/streaming_service.go +++ b/testhelpers/ldservices/streaming_service.go @@ -21,7 +21,7 @@ const ( // handler, stream := ldservices.ServerSideStreamingHandler(initialData.ToPutEvent()) // server := httptest.NewServer(handler) // stream.Enqueue(httphelpers.SSEEvent{Event: "patch", Data: myPatchData}) // push an update -// stream.Close() // force any current stream connections to be closed +// stream.Finish() // force any current stream connections to be closed func ServerSideStreamingServiceHandler( initialEvent httphelpers.SSEEvent, ) (http.Handler, httphelpers.SSEStreamControl) { diff --git a/testhelpers/ldservicesv2/server_sdk_data.go b/testhelpers/ldservicesv2/server_sdk_data.go index 9b0b4578..defe26a9 100644 --- a/testhelpers/ldservicesv2/server_sdk_data.go +++ b/testhelpers/ldservicesv2/server_sdk_data.go @@ -7,11 +7,6 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" ) -type fakeVersionedKind struct { - Key string `json:"key"` - Version int `json:"version"` -} - // ServerSDKData is a convenience type for constructing a test server-side SDK data payload for // PollingServiceHandler or StreamingServiceHandler. Its String() method returns a JSON object with // the expected "flags" and "segments" properties. @@ -59,15 +54,23 @@ func (s *ServerSDKData) Segments(segments ...ldmodel.Segment) *ServerSDKData { return s } +func mustMarshal(model any) json.RawMessage { + data, err := json.Marshal(model) + if err != nil { + panic(err) + } + return data +} + // ToPutObjects converts the data to a list of PutObject objects that can be fed to a mock streaming data source. func (s *ServerSDKData) ToPutObjects() []fdv2proto.PutObject { - var objs []fdv2proto.PutObject + objs := make([]fdv2proto.PutObject, 0, len(s.FlagsMap)+len(s.SegmentsMap)) for _, flag := range s.FlagsMap { base := fdv2proto.PutObject{ Version: flag.Version, Kind: fdv2proto.FlagKind, Key: flag.Key, - Object: flag, + Object: mustMarshal(flag), } objs = append(objs, base) } @@ -76,7 +79,7 @@ func (s *ServerSDKData) ToPutObjects() []fdv2proto.PutObject { Version: segment.Version, Kind: fdv2proto.SegmentKind, Key: segment.Key, - Object: segment, + Object: mustMarshal(segment), } objs = append(objs, base) } diff --git a/testhelpers/ldservicesv2/streaming_protocol_builder.go b/testhelpers/ldservicesv2/streaming_protocol_builder.go index 1c6e1e98..9c4e6932 100644 --- a/testhelpers/ldservicesv2/streaming_protocol_builder.go +++ b/testhelpers/ldservicesv2/streaming_protocol_builder.go @@ -40,7 +40,7 @@ func (f *StreamingProtocol) WithPutObject(object fdv2proto.PutObject) *Streaming // WithTransferred adds a PayloadTransferred event to the protocol with a given version. The state is a a placeholder // string. func (f *StreamingProtocol) WithTransferred(version int) *StreamingProtocol { - return f.pushEvent(fdv2proto.PayloadTransferred{State: "[p:17YNC7XBH88Y6RDJJ48EKPCJS7:53]", Version: version}) + return f.pushEvent(fdv2proto.NewSelector("[p:17YNC7XBH88Y6RDJJ48EKPCJS7:53]", version)) } // WithPutObjects adds multiple PutObject events to the protocol.