From 72ddf8b3373de7d1902b841ee03cb7056c1275ac Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Fri, 11 Oct 2024 11:12:15 -0700 Subject: [PATCH 1/6] refactor: clean up fdv2 parsing abstractions --- interfaces/client_interface.go | 2 +- internal/broadcasters.go | 2 +- internal/datasource/streaming_data_source.go | 2 +- internal/datasourcev2/polling_data_source.go | 19 ++- internal/datasourcev2/polling_http_request.go | 115 +++---------- internal/datasourcev2/polling_response.go | 48 ------ .../datasourcev2/streaming_data_source.go | 160 +++++------------- internal/datasystem/store.go | 20 ++- internal/datasystem/store_test.go | 75 +++++--- internal/fdv2proto/changeset.go | 99 +++++++++++ internal/fdv2proto/changeset_test.go | 77 +++++++++ .../fdv2proto/deltas_to_storable_items.go | 59 +++++++ internal/fdv2proto/event_to_storable_item.go | 59 ------- internal/fdv2proto/events.go | 71 ++++---- internal/fdv2proto/selector.go | 22 ++- .../sharedtest/mocks/mock_data_destination.go | 14 +- subsystems/data_destination.go | 4 +- .../ldstoreimpl/big_segment_store_wrapper.go | 2 +- testhelpers/ldservices/streaming_service.go | 2 +- 19 files changed, 430 insertions(+), 422 deletions(-) delete mode 100644 internal/datasourcev2/polling_response.go create mode 100644 internal/fdv2proto/changeset.go create mode 100644 internal/fdv2proto/changeset_test.go create mode 100644 internal/fdv2proto/deltas_to_storable_items.go delete mode 100644 internal/fdv2proto/event_to_storable_item.go diff --git a/interfaces/client_interface.go b/interfaces/client_interface.go index 34473b0d..d244d54c 100644 --- a/interfaces/client_interface.go +++ b/interfaces/client_interface.go @@ -195,7 +195,7 @@ type LDClientEvents interface { // LDClientInterface defines the basic SDK client operations implemented by LDClient. // // This includes all methods for evaluating a feature flag or generating analytics events, as defined by -// LDEvaluations and LDEvents. It does not include general control operations like Flush(), Close(), or +// LDEvaluations and LDEvents. It does not include general control operations like Flush(), Finish(), or // GetDataSourceStatusProvider(). type LDClientInterface interface { LDClientEvaluations diff --git a/internal/broadcasters.go b/internal/broadcasters.go index 140ce8de..e8c6df69 100644 --- a/internal/broadcasters.go +++ b/internal/broadcasters.go @@ -10,7 +10,7 @@ import ( // // The standard pattern is that AddListener returns a new receive-only channel; RemoveListener unsubscribes // that channel, and closes the sending end of it; Broadcast sends a value to all of the subscribed channels -// (if any); and Close unsubscribes and closes all existing channels. +// (if any); and Finish unsubscribes and closes all existing channels. // Arbitrary buffer size to make it less likely that we'll block when broadcasting to channels. It is still // the consumer's responsibility to make sure they're reading the channel. diff --git a/internal/datasource/streaming_data_source.go b/internal/datasource/streaming_data_source.go index b0b57a50..24c82732 100644 --- a/internal/datasource/streaming_data_source.go +++ b/internal/datasource/streaming_data_source.go @@ -144,7 +144,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< if !ok { // COVERAGE: stream.Events is only closed if the EventSource has been closed. However, that // only happens when we have received from sp.halt, in which case we return immediately - // after calling stream.Close(), terminating the for loop-- so we should not actually reach + // after calling stream.Finish(), terminating the for loop-- so we should not actually reach // this point. Still, in case the channel is somehow closed unexpectedly, we do want to // terminate the loop. return diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index 2311fd54..30f79239 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -21,7 +21,7 @@ const ( // PollingRequester allows PollingProcessor to delegate fetching data to another component. // This is useful for testing the PollingProcessor without needing to set up a test HTTP server. type PollingRequester interface { - Request() (*PollingResponse, error) + Request() (*fdv2proto.ChangeSet, error) BaseURI() string FilterKey() string } @@ -142,21 +142,22 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) { } func (pp *PollingProcessor) poll() error { - response, err := pp.requester.Request() + changeSet, err := pp.requester.Request() if err != nil { return err } - if response.Cached() { - return nil - } - - switch response.Intent() { + payload := changeSet.Intent().Payloads[0] + switch payload.Code { case fdv2proto.IntentTransferFull: - pp.dataDestination.SetBasis(response.Events(), response.Selector(), true) + pp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true) case fdv2proto.IntentTransferChanges: - pp.dataDestination.ApplyDelta(response.Events(), response.Selector(), true) + pp.dataDestination.ApplyDelta(changeSet.Changes(), changeSet.Selector(), true) + case fdv2proto.IntentNone: + { + // no-op, we are already up-to-date. + } } return nil diff --git a/internal/datasourcev2/polling_http_request.go b/internal/datasourcev2/polling_http_request.go index 0cd81ddf..ec09f081 100644 --- a/internal/datasourcev2/polling_http_request.go +++ b/internal/datasourcev2/polling_http_request.go @@ -2,21 +2,17 @@ package datasourcev2 import ( "encoding/json" - "errors" "fmt" "io" "net/http" "net/url" - "github.com/launchdarkly/go-jsonstream/v3/jreader" "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" + "github.com/gregjones/httpcache" "github.com/launchdarkly/go-sdk-common/v3/ldlog" "github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" "github.com/launchdarkly/go-server-sdk/v7/subsystems" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" - - "github.com/gregjones/httpcache" "golang.org/x/exp/maps" ) @@ -70,7 +66,7 @@ func (r *pollingRequester) FilterKey() string { return r.filterKey } -func (r *pollingRequester) Request() (*PollingResponse, error) { +func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) { if r.loggers.IsDebugEnabled() { r.loggers.Debug("Polling LaunchDarkly for feature flag updates") } @@ -80,7 +76,7 @@ func (r *pollingRequester) Request() (*PollingResponse, error) { return nil, err } if cached { - return NewCachedPollingResponse(), nil + return fdv2proto.NewChangeSetBuilder().NoChanges(), nil } var payload fdv2proto.PollingPayload @@ -88,102 +84,41 @@ func (r *pollingRequester) Request() (*PollingResponse, error) { return nil, malformedJSONError{err} } - parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) { - dataKind, err := kind.ToFDV1() - if err != nil { - return ldstoretypes.ItemDescriptor{}, err - } - item, err := dataKind.DeserializeFromJSONReader(&r) - return item, err - } - - updates := make([]fdv2proto.Event, 0, len(payload.Events)) - - var intentCode fdv2proto.IntentCode + changeSet := fdv2proto.NewChangeSetBuilder() for _, event := range payload.Events { switch event.Name { case fdv2proto.EventServerIntent: - { - var serverIntent fdv2proto.ServerIntent - err := json.Unmarshal(event.Data, &serverIntent) - if err != nil { - return nil, err - } else if len(serverIntent.Payloads) == 0 { - return nil, errors.New("server-intent event has no payloads") - } - - intentCode = serverIntent.Payloads[0].Code - if intentCode == fdv2proto.IntentNone { - return NewCachedPollingResponse(), nil - } + var serverIntent fdv2proto.ServerIntent + err := json.Unmarshal(event.Data, &serverIntent) + if err != nil { + return nil, err + } + if err := changeSet.Start(serverIntent); err != nil { + return nil, err } case fdv2proto.EventPutObject: - { - r := jreader.NewReader(event.Data) - - var ( - key string - kind fdv2proto.ObjectKind - item ldstoretypes.ItemDescriptor - err error - version int - ) - - for obj := r.Object().WithRequiredProperties([]string{ - versionField, kindField, keyField, objectField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - case keyField: - key = r.String() - case objectField: - item, err = parseItem(r, kind) - if err != nil { - return nil, err - } - } - } - updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item, Version: version}) + var put fdv2proto.PutObject + if err := json.Unmarshal(event.Data, &put); err != nil { + return nil, err } + changeSet.AddPut(put.Kind, put.Key, put.Version, put.Object) case fdv2proto.EventDeleteObject: - { - r := jreader.NewReader(event.Data) - - var ( - version int - kind fdv2proto.ObjectKind - key string - ) - - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - //nolint:godox - // TODO: An unrecognized kind should be ignored for forwards compat; the question is, - // do we throw out the DeleteObject here, or let the SDK's store handle it? - case keyField: - key = r.String() - } - } - updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version}) + var deleteObject fdv2proto.DeleteObject + if err := json.Unmarshal(event.Data, &deleteObject); err != nil { + return nil, err } + changeSet.AddDelete(deleteObject.Kind, deleteObject.Key, deleteObject.Version) case fdv2proto.EventPayloadTransferred: - //nolint:godox - // TODO: deserialize the state and create a fdv2proto.Selector. + var selector fdv2proto.Selector + if err := json.Unmarshal(event.Data, &selector); err != nil { + return nil, err + } + return changeSet.Finish(selector) } } - if intentCode == "" { - return nil, errors.New("no server-intent event found in polling response") - } - - return NewPollingResponse(intentCode, updates, fdv2proto.NoSelector()), nil + return nil, fmt.Errorf("malformed protocol") } func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) { diff --git a/internal/datasourcev2/polling_response.go b/internal/datasourcev2/polling_response.go deleted file mode 100644 index 80b2c36f..00000000 --- a/internal/datasourcev2/polling_response.go +++ /dev/null @@ -1,48 +0,0 @@ -package datasourcev2 - -import "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" - -// PollingResponse represents the result of a polling request. -type PollingResponse struct { - events []fdv2proto.Event - cached bool - intent fdv2proto.IntentCode - selector *fdv2proto.Selector -} - -// NewCachedPollingResponse indicates that the response has not changed. -func NewCachedPollingResponse() *PollingResponse { - return &PollingResponse{ - cached: true, - } -} - -// NewPollingResponse indicates that data was received. -func NewPollingResponse(intent fdv2proto.IntentCode, events []fdv2proto.Event, - selector *fdv2proto.Selector) *PollingResponse { - return &PollingResponse{ - events: events, - intent: intent, - selector: selector, - } -} - -// Events returns the events in the response. -func (p *PollingResponse) Events() []fdv2proto.Event { - return p.events -} - -// Cached returns true if the response was cached, meaning data has not changed. -func (p *PollingResponse) Cached() bool { - return p.cached -} - -// Intent returns the server intent code of the response. -func (p *PollingResponse) Intent() fdv2proto.IntentCode { - return p.intent -} - -// Selector returns the Selector of the response. -func (p *PollingResponse) Selector() *fdv2proto.Selector { - return p.selector -} diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index 3c02812d..b3d64926 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -2,7 +2,6 @@ package datasourcev2 import ( "encoding/json" - "errors" "net/http" "net/url" "sync" @@ -10,7 +9,7 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" - "github.com/launchdarkly/go-jsonstream/v3/jreader" + es "github.com/launchdarkly/eventsource" "github.com/launchdarkly/go-sdk-common/v3/ldlog" "github.com/launchdarkly/go-sdk-common/v3/ldtime" ldevents "github.com/launchdarkly/go-sdk-events/v3" @@ -19,9 +18,6 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/internal/datasource" "github.com/launchdarkly/go-server-sdk/v7/internal/endpoints" "github.com/launchdarkly/go-server-sdk/v7/subsystems" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" - - es "github.com/launchdarkly/eventsource" "golang.org/x/exp/maps" ) @@ -42,11 +38,6 @@ const ( streamingWillRetryMessage = "will retry" ) -type changeSet struct { - intent *fdv2proto.ServerIntent - events []es.Event -} - // Implementation of the streaming data source, not including the lower-level SSE implementation which is in // the eventsource package. // @@ -144,9 +135,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< } }() - currentChangeSet := changeSet{ - events: make([]es.Event, 0), - } + changeSetBuilder := fdv2proto.NewChangeSetBuilder() for { select { @@ -154,7 +143,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< if !ok { // COVERAGE: stream.Events is only closed if the EventSource has been closed. However, that // only happens when we have received from sp.halt, in which case we return immediately - // after calling stream.Close(), terminating the for loop-- so we should not actually reach + // after calling stream.Finish(), terminating the for loop-- so we should not actually reach // this point. Still, in case the channel is somehow closed unexpectedly, we do want to // terminate the loop. return @@ -169,6 +158,10 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< shouldRestart := false gotMalformedEvent := func(event es.Event, err error) { + + // The protocol should "forget" anything that happens upon receiving an error. + changeSetBuilder = fdv2proto.NewChangeSetBuilder() + if event == nil { sp.loggers.Errorf( "Received streaming events with malformed JSON data (%s); will restart stream", @@ -197,29 +190,35 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< case fdv2proto.EventHeartbeat: // Swallow the event and move on. case fdv2proto.EventServerIntent: - //nolint: godox - // TODO: Replace all this json unmarshalling with a nicer jreader implementation. + var serverIntent fdv2proto.ServerIntent err := json.Unmarshal([]byte(event.Data()), &serverIntent) if err != nil { gotMalformedEvent(event, err) break - } else if len(serverIntent.Payloads) == 0 { - gotMalformedEvent(event, errors.New("server-intent event has no payloads")) - break } - if serverIntent.Payloads[0].Code == "none" { - sp.loggers.Info("Server intent is none, skipping") - continue + if err := changeSetBuilder.Start(serverIntent); err != nil { + gotMalformedEvent(event, err) + break } - currentChangeSet = changeSet{events: make([]es.Event, 0), intent: &serverIntent} - case fdv2proto.EventPutObject: - currentChangeSet.events = append(currentChangeSet.events, event) + var p fdv2proto.PutObject + err := json.Unmarshal([]byte(event.Data()), &p) + if err != nil { + gotMalformedEvent(event, err) + break + } + changeSetBuilder.AddPut(p.Kind, p.Key, p.Version, p.Object) case fdv2proto.EventDeleteObject: - currentChangeSet.events = append(currentChangeSet.events, event) + var d fdv2proto.DeleteObject + err := json.Unmarshal([]byte(event.Data()), &d) + if err != nil { + gotMalformedEvent(event, err) + break + } + changeSetBuilder.AddDelete(d.Kind, d.Key, d.Version) case fdv2proto.EventGoodbye: var goodbye fdv2proto.Goodbye err := json.Unmarshal([]byte(event.Data()), &goodbye) @@ -235,48 +234,42 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< var errorData fdv2proto.Error err := json.Unmarshal([]byte(event.Data()), &errorData) if err != nil { - //nolint: godox - // TODO: Confirm that an error means we have to discard - // everything that has come before. - currentChangeSet = changeSet{events: make([]es.Event, 0)} gotMalformedEvent(event, err) break } sp.loggers.Errorf("Error on %s: %s", errorData.PayloadID, errorData.Reason) - currentChangeSet = changeSet{events: make([]es.Event, 0)} - //nolint: godox - // TODO: Do we need to restart here? + // The protocol should "forget" anything that has happened, and expect that we will receive + // more messages in the future (starting with a server intent.) + changeSetBuilder = fdv2proto.NewChangeSetBuilder() case fdv2proto.EventPayloadTransferred: - - selector := &fdv2proto.Selector{} - - err := json.Unmarshal([]byte(event.Data()), selector) + var selector fdv2proto.Selector + err := json.Unmarshal([]byte(event.Data()), &selector) if err != nil { gotMalformedEvent(event, err) break } - currentChangeSet.events = append(currentChangeSet.events, event) - updates, err := deserializeEvents(currentChangeSet.events) + // After calling Finish, the builder is ready to receive a new changeset. + changeSet, err := changeSetBuilder.Finish(selector) if err != nil { - sp.loggers.Errorf("Error processing changeset: %s", err) gotMalformedEvent(nil, err) break } - switch currentChangeSet.intent.Payloads[0].Code { + // The Finish method guarantees that at least one payload is present. + payload := changeSet.Intent().Payloads[0] + switch payload.Code { case fdv2proto.IntentTransferFull: - { - sp.dataDestination.SetBasis(updates, selector, true) - sp.setInitializedAndNotifyClient(true, closeWhenReady) - } + sp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true) + sp.setInitializedAndNotifyClient(true, closeWhenReady) case fdv2proto.IntentTransferChanges: - sp.dataDestination.ApplyDelta(updates, selector, true) + sp.dataDestination.ApplyDelta(changeSet.Changes(), changeSet.Selector(), true) + case fdv2proto.IntentNone: + // No changes, our existing data is up-to-date for the moment. } - currentChangeSet = changeSet{events: make([]es.Event, 0)} default: sp.loggers.Infof("Unexpected event found in stream: %s", event.Event()) } @@ -446,77 +439,4 @@ func (sp *StreamProcessor) GetFilterKey() string { return sp.cfg.FilterKey } -func deserializeEvents(events []es.Event) ([]fdv2proto.Event, error) { - updates := make([]fdv2proto.Event, 0, len(events)) - - parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) { - dataKind, err := kind.ToFDV1() - if err != nil { - return ldstoretypes.ItemDescriptor{}, err - } - item, err := dataKind.DeserializeFromJSONReader(&r) - return item, err - } - - for _, event := range events { - switch fdv2proto.EventName(event.Event()) { - case fdv2proto.EventPutObject: - r := jreader.NewReader([]byte(event.Data())) - - var ( - kind fdv2proto.ObjectKind - key string - version int - item ldstoretypes.ItemDescriptor - err error - ) - - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField, objectField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - //nolint:godox - // TODO: An unrecognized kind should be ignored for forwards compat; the question is, - // do we throw out the DeleteObject here, or let the SDK's store handle it? - case keyField: - key = r.String() - case objectField: - item, err = parseItem(r, kind) - if err != nil { - return updates, err - } - } - } - updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item.Item, Version: version}) - case fdv2proto.EventDeleteObject: - r := jreader.NewReader([]byte(event.Data())) - - var ( - version int - kind fdv2proto.ObjectKind - key string - ) - - for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); { - switch string(obj.Name()) { - case versionField: - version = r.Int() - case kindField: - kind = fdv2proto.ObjectKind(r.String()) - //nolint:godox - // TODO: An unrecognized kind should be ignored for forwards compat; the question is, - // do we throw out the DeleteObject here, or let the SDK's store handle it? - case keyField: - key = r.String() - } - } - updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version}) - } - } - - return updates, nil -} - // vim: foldmethod=marker foldlevel=0 diff --git a/internal/datasystem/store.go b/internal/datasystem/store.go index 2cb88318..b09615e7 100644 --- a/internal/datasystem/store.go +++ b/internal/datasystem/store.go @@ -68,7 +68,7 @@ type Store struct { active subsystems.ReadOnlyStore // Identifies the current data. - selector *fdv2proto.Selector + selector fdv2proto.Selector mu sync.RWMutex @@ -134,7 +134,7 @@ func (s *Store) WithPersistence(persistent subsystems.DataStore, mode subsystems } // Selector returns the current selector. -func (s *Store) Selector() *fdv2proto.Selector { +func (s *Store) Selector() fdv2proto.Selector { s.mu.RLock() defer s.mu.RUnlock() return s.selector @@ -152,8 +152,12 @@ func (s *Store) Close() error { // SetBasis sets the basis of the store. Any existing data is discarded. To request data persistence, // set persist to true. -func (s *Store) SetBasis(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) { - collections := fdv2proto.ToStorableItems(events) +func (s *Store) SetBasis(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) { + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + s.loggers.Errorf("store: couldn't set basis due to malformed data: %v", err) + return + } s.mu.Lock() defer s.mu.Unlock() @@ -177,8 +181,12 @@ func (s *Store) shouldPersist() bool { // ApplyDelta applies a delta update to the store. ApplyDelta should not be called until SetBasis has been called. // To request data persistence, set persist to true. -func (s *Store) ApplyDelta(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) { - collections := fdv2proto.ToStorableItems(events) +func (s *Store) ApplyDelta(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) { + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + s.loggers.Errorf("store: couldn't apply delta due to malformed data: %v", err) + return + } s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/datasystem/store_test.go b/internal/datasystem/store_test.go index f6701527..7dc074fb 100644 --- a/internal/datasystem/store_test.go +++ b/internal/datasystem/store_test.go @@ -1,14 +1,16 @@ package datasystem import ( + "encoding/json" "errors" + "fmt" + "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldbuilders" + "github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest" "math/rand" "sync" "testing" "time" - "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldmodel" - "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" "github.com/launchdarkly/go-server-sdk/v7/subsystems" @@ -45,7 +47,7 @@ func TestStore_NoPersistence_MemoryStore_IsInitialized(t *testing.T) { none := fdv2proto.NoSelector() tests := []struct { name string - selector *fdv2proto.Selector + selector fdv2proto.Selector persist bool }{ {"with selector, persist", v1, true}, @@ -58,12 +60,28 @@ func TestStore_NoPersistence_MemoryStore_IsInitialized(t *testing.T) { logCapture := ldlogtest.NewMockLog() store := NewStore(logCapture.Loggers) defer store.Close() - store.SetBasis([]fdv2proto.Event{}, tt.selector, tt.persist) + store.SetBasis([]fdv2proto.Change{}, tt.selector, tt.persist) assert.True(t, store.IsInitialized()) }) } } +func MustMarshal(model any) json.RawMessage { + data, err := json.Marshal(model) + if err != nil { + panic(err) + } + return data +} + +func MinimalFlag(key string, version int) json.RawMessage { + return []byte(fmt.Sprintf(`{"key":"`+key+`","version": %v}`, version)) +} + +func MinimalSegment(key string, version int) json.RawMessage { + return []byte(fmt.Sprintf(`{"key":"`+key+`","version": %v}`, version)) +} + func TestStore_Commit(t *testing.T) { t.Run("absence of persistent store doesn't cause error when committing", func(t *testing.T) { logCapture := ldlogtest.NewMockLog() @@ -81,24 +99,26 @@ func TestStore_Commit(t *testing.T) { store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeReadWrite, nil) defer store.Close() - // The store receives data as a list of events, but the persistent store receives them as an + // The store receives data as a list of changes, but the persistent store receives them as an // []ldstoretypes.Collection. - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldmodel.FeatureFlag{}}, - fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: ldmodel.Segment{}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: MinimalSegment("bar", 2)}, } + // OK: basically we need to match up the JSON with the FlagBuilder stuff for this to work, a naive marshal won't work. + // The original data system PR has some infra for this. Maybe bring it in in this pr. output := []ldstoretypes.Collection{ { Kind: ldstoreimpl.Features(), Items: []ldstoretypes.KeyedItemDescriptor{ - {Key: "foo", Item: ldstoretypes.ItemDescriptor{Version: 1, Item: ldmodel.FeatureFlag{}}}, + {Key: "foo", Item: sharedtest.FlagDescriptor(ldbuilders.NewFlagBuilder("foo").Version(1).Build())}, }, }, { Kind: ldstoreimpl.Segments(), Items: []ldstoretypes.KeyedItemDescriptor{ - {Key: "bar", Item: ldstoretypes.ItemDescriptor{Version: 2, Item: ldmodel.Segment{}}}, + {Key: "bar", Item: sharedtest.SegmentDescriptor(ldbuilders.NewSegmentBuilder("bar").Version(2).Build())}, }, }} @@ -124,9 +144,9 @@ func TestStore_Commit(t *testing.T) { store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeReadWrite, nil) defer store.Close() - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Object: ldstoretypes.ItemDescriptor{Version: 1}}, - fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Object: ldstoretypes.ItemDescriptor{Version: 2}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: MinimalSegment("bar", 2)}, } store.SetBasis(input, fdv2proto.NoSelector(), false) @@ -148,9 +168,9 @@ func TestStore_Commit(t *testing.T) { store := NewStore(logCapture.Loggers).WithPersistence(spy, subsystems.DataStoreModeRead, nil) defer store.Close() - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Object: ldstoretypes.ItemDescriptor{Version: 1}}, - fdv2proto.PutObject{Kind: fdv2proto.SegmentKind, Key: "bar", Object: ldstoretypes.ItemDescriptor{Version: 2}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("key", 1)}, + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.SegmentKind, Key: "bar", Version: 2, Object: MinimalSegment("bar", 2)}, } // Even though persist is true, the store was marked as read-only, so it shouldn't be written to. @@ -174,8 +194,8 @@ func TestStore_GetActive(t *testing.T) { assert.NoError(t, err) assert.Equal(t, foo, ldstoretypes.ItemDescriptor{}.NotFound()) - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldstoretypes.ItemDescriptor{}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, } store.SetBasis(input, fdv2proto.NoSelector(), false) @@ -206,8 +226,8 @@ func TestStore_GetActive(t *testing.T) { _, err := store.Get(ldstoreimpl.Features(), "foo") assert.Equal(t, errImAPersistentStore, err) - input := []fdv2proto.Event{ - fdv2proto.PutObject{Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: ldstoretypes.ItemDescriptor{}}, + input := []fdv2proto.Change{ + {Action: fdv2proto.ChangeTypePut, Kind: fdv2proto.FlagKind, Key: "foo", Version: 1, Object: MinimalFlag("foo", 1)}, } store.SetBasis(input, fdv2proto.NoSelector(), false) @@ -230,22 +250,22 @@ func TestStore_SelectorIsRemembered(t *testing.T) { selector4 := fdv2proto.NewSelector("qux", 4) selector5 := fdv2proto.NewSelector("this better be the last one", 5) - store.SetBasis([]fdv2proto.Event{}, selector1, false) + store.SetBasis([]fdv2proto.Change{}, selector1, false) assert.Equal(t, selector1, store.Selector()) - store.SetBasis([]fdv2proto.Event{}, selector2, false) + store.SetBasis([]fdv2proto.Change{}, selector2, false) assert.Equal(t, selector2, store.Selector()) - store.ApplyDelta([]fdv2proto.Event{}, selector3, false) + store.ApplyDelta([]fdv2proto.Change{}, selector3, false) assert.Equal(t, selector3, store.Selector()) - store.ApplyDelta([]fdv2proto.Event{}, selector4, false) + store.ApplyDelta([]fdv2proto.Change{}, selector4, false) assert.Equal(t, selector4, store.Selector()) assert.NoError(t, store.Commit()) assert.Equal(t, selector4, store.Selector()) - store.SetBasis([]fdv2proto.Event{}, selector5, false) + store.SetBasis([]fdv2proto.Change{}, selector5, false) } func TestStore_Concurrency(t *testing.T) { @@ -278,10 +298,10 @@ func TestStore_Concurrency(t *testing.T) { _ = store.IsInitialized() }) go run(func() { - store.SetBasis([]fdv2proto.Event{}, fdv2proto.NoSelector(), true) + store.SetBasis([]fdv2proto.Change{}, fdv2proto.NoSelector(), true) }) go run(func() { - store.ApplyDelta([]fdv2proto.Event{}, fdv2proto.NoSelector(), true) + store.ApplyDelta([]fdv2proto.Change{}, fdv2proto.NoSelector(), true) }) go run(func() { _ = store.Selector() @@ -331,6 +351,7 @@ func (f *fakeStore) Close() error { // This matcher is required instead of calling ElementsMatch directly on two slices of collections because // the order of the collections, or the order within each collection, is not defined. func requireCollectionsMatch(t *testing.T, expected []ldstoretypes.Collection, actual []ldstoretypes.Collection) { + t.Helper() require.Equal(t, len(expected), len(actual)) for _, expectedCollection := range expected { for _, actualCollection := range actual { diff --git a/internal/fdv2proto/changeset.go b/internal/fdv2proto/changeset.go new file mode 100644 index 00000000..eb5c72cf --- /dev/null +++ b/internal/fdv2proto/changeset.go @@ -0,0 +1,99 @@ +package fdv2proto + +import ( + "encoding/json" + "errors" +) + +type ChangeType string + +const ( + ChangeTypePut = ChangeType("put") + ChangeTypeDelete = ChangeType("delete") +) + +type Change struct { + Action ChangeType + Kind ObjectKind + Key string + Version int + Object json.RawMessage +} + +type ChangeSet struct { + intent ServerIntent + changes []Change + selector Selector +} + +func (c *ChangeSet) Intent() ServerIntent { + return c.intent +} + +func (c *ChangeSet) Changes() []Change { + return c.changes +} + +func (c *ChangeSet) Selector() Selector { + return c.selector +} + +type ChangeSetBuilder struct { + intent *ServerIntent + changes []Change +} + +func NewChangeSetBuilder() *ChangeSetBuilder { + return &ChangeSetBuilder{} +} + +func (c *ChangeSetBuilder) NoChanges() *ChangeSet { + return &ChangeSet{ + intent: ServerIntent{Payloads: []Payload{{Code: IntentNone}}}, + selector: NoSelector(), + changes: nil, + } +} + +func (c *ChangeSetBuilder) Start(intent ServerIntent) error { + if len(intent.Payloads) == 0 { + return errors.New("changeset: server-intent event has no payloads") + } + c.intent = &intent + c.changes = nil + return nil +} + +// Finish identifies a changeset with a selector, and returns the completed changeset. +// It clears any existing changes, while preserving the current intent, so that the builder can be reused. +func (c *ChangeSetBuilder) Finish(selector Selector) (*ChangeSet, error) { + if c.intent == nil { + return nil, errors.New("changeset: cannot complete without a server-intent") + } + changes := &ChangeSet{ + intent: *c.intent, + selector: selector, + changes: c.changes, + } + c.changes = nil + return changes, nil +} + +func (c *ChangeSetBuilder) AddPut(kind ObjectKind, key string, version int, object json.RawMessage) { + c.changes = append(c.changes, Change{ + Action: ChangeTypePut, + Kind: kind, + Key: key, + Version: version, + Object: object, + }) +} + +func (c *ChangeSetBuilder) AddDelete(kind ObjectKind, key string, version int) { + c.changes = append(c.changes, Change{ + Action: ChangeTypeDelete, + Kind: kind, + Key: key, + Version: version, + }) +} diff --git a/internal/fdv2proto/changeset_test.go b/internal/fdv2proto/changeset_test.go new file mode 100644 index 00000000..0426c07f --- /dev/null +++ b/internal/fdv2proto/changeset_test.go @@ -0,0 +1,77 @@ +package fdv2proto + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestChangeSetBuilder_New(t *testing.T) { + builder := NewChangeSetBuilder() + assert.NotNil(t, builder) +} + +func TestChangeSetBuilder_MustStartToFinish(t *testing.T) { + builder := NewChangeSetBuilder() + selector := NewSelector("foo", 1) + _, err := builder.Finish(selector) + assert.Error(t, err) + + assert.NoError(t, builder.Start(ServerIntent{Payloads: []Payload{{Code: IntentNone}}})) + + _, err = builder.Finish(selector) + assert.NoError(t, err) +} + +func TestChangeSetBuilder_MustHaveAtLeastOnePayload(t *testing.T) { + builder := NewChangeSetBuilder() + err := builder.Start(ServerIntent{}) + assert.Error(t, err) + + err = builder.Start(ServerIntent{Payloads: []Payload{{Code: IntentNone}}}) + assert.NoError(t, err) + + err = builder.Start(ServerIntent{Payloads: []Payload{{Code: IntentNone}, {Code: IntentNone}, {Code: IntentNone}}}) + assert.NoError(t, err) +} + +func TestChangeSetBuilder_Changes(t *testing.T) { + builder := NewChangeSetBuilder() + err := builder.Start(ServerIntent{Payloads: []Payload{{Code: IntentTransferChanges}}}) + assert.NoError(t, err) + + builder.AddPut("foo", "bar", 1, []byte("baz")) + builder.AddDelete("foo", "bar", 1) + + selector := NewSelector("foo", 1) + changeSet, err := builder.Finish(selector) + assert.NoError(t, err) + assert.NotNil(t, changeSet) + + changes := changeSet.Changes() + assert.Equal(t, 2, len(changes)) + assert.Equal(t, Change{Action: ChangeTypePut, Kind: "foo", Key: "bar", Version: 1, Object: []byte("baz")}, changes[0]) + assert.Equal(t, Change{Action: ChangeTypeDelete, Kind: "foo", Key: "bar", Version: 1}, changes[1]) + + assert.Equal(t, IntentTransferChanges, changeSet.Intent().Payloads[0].Code) + assert.Equal(t, selector, changeSet.Selector()) + +} + +func TestChangeSetBuilder_ImplicitXferChanges(t *testing.T) { + +} + +func TestChangeSetBuilder_NoChanges(t *testing.T) { + builder := NewChangeSetBuilder() + changeSet := builder.NoChanges() + assert.NotNil(t, changeSet) + + intent := changeSet.Intent() + assert.NotNil(t, intent) + + assert.NotEmpty(t, intent.Payloads) + assert.Equal(t, IntentNone, intent.Payloads[0].Code) + + assert.False(t, changeSet.Selector().IsDefined()) + assert.Equal(t, NoSelector(), changeSet.Selector()) +} diff --git a/internal/fdv2proto/deltas_to_storable_items.go b/internal/fdv2proto/deltas_to_storable_items.go new file mode 100644 index 00000000..089b96eb --- /dev/null +++ b/internal/fdv2proto/deltas_to_storable_items.go @@ -0,0 +1,59 @@ +package fdv2proto + +import ( + "github.com/launchdarkly/go-jsonstream/v3/jreader" + "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" + "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" +) + +// ToStorableItems converts a list of FDv2 events to a list of collections suitable for insertion +// into a data store. +func ToStorableItems(deltas []Change) ([]ldstoretypes.Collection, error) { + collections := make(kindMap) + for _, event := range deltas { + kind, ok := event.Kind.ToFDV1() + if !ok { + // If we don't recognize this kind, it's not an error and should be ignored for forwards + // compatibility. + continue + } + + switch event.Action { + case ChangeTypePut: + // A put requires deserializing the item. We delegate to the optimized streaming JSON + // parser. + reader := jreader.NewReader(event.Object) + item, err := kind.DeserializeFromJSONReader(&reader) + if err != nil { + return nil, err + } + collections[kind] = append(collections[kind], ldstoretypes.KeyedItemDescriptor{ + Key: event.Key, + Item: item, + }) + case ChangeTypeDelete: + // A deletion is represented by a tombstone, which is an ItemDescriptor with a version and nil item. + collections[kind] = append(collections[kind], ldstoretypes.KeyedItemDescriptor{ + Key: event.Key, + Item: ldstoretypes.ItemDescriptor{Version: event.Version, Item: nil}, + }) + default: + // An unknown action isn't an error, and should be ignored for forwards compatibility. + continue + } + } + + return collections.flatten(), nil +} + +type kindMap map[datakinds.DataKindInternal][]ldstoretypes.KeyedItemDescriptor + +func (k kindMap) flatten() (result []ldstoretypes.Collection) { + for kind, items := range k { + result = append(result, ldstoretypes.Collection{ + Kind: kind, + Items: items, + }) + } + return +} diff --git a/internal/fdv2proto/event_to_storable_item.go b/internal/fdv2proto/event_to_storable_item.go deleted file mode 100644 index 379ff12d..00000000 --- a/internal/fdv2proto/event_to_storable_item.go +++ /dev/null @@ -1,59 +0,0 @@ -package fdv2proto - -import ( - "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" - "github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes" -) - -// ToStorableItems converts a list of FDv2 events to a list of collections suitable for insertion -// into a data store. -func ToStorableItems(events []Event) []ldstoretypes.Collection { - flagCollection := ldstoretypes.Collection{ - Kind: datakinds.Features, - Items: make([]ldstoretypes.KeyedItemDescriptor, 0), - } - - segmentCollection := ldstoretypes.Collection{ - Kind: datakinds.Segments, - Items: make([]ldstoretypes.KeyedItemDescriptor, 0), - } - - for _, event := range events { - switch e := event.(type) { - case PutObject: - switch e.Kind { - case FlagKind: - flagCollection.Items = append(flagCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{Version: e.Version, Item: e.Object}, - }) - case SegmentKind: - segmentCollection.Items = append(segmentCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{Version: e.Version, Item: e.Object}, - }) - } - case DeleteObject: - switch e.Kind { - case FlagKind: - flagCollection.Items = append(flagCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{ - Version: e.Version, - Item: nil, - }, - }) - case SegmentKind: - segmentCollection.Items = append(segmentCollection.Items, ldstoretypes.KeyedItemDescriptor{ - Key: e.Key, - Item: ldstoretypes.ItemDescriptor{ - Version: e.Version, - Item: nil, - }, - }) - } - } - } - - return []ldstoretypes.Collection{flagCollection, segmentCollection} -} diff --git a/internal/fdv2proto/events.go b/internal/fdv2proto/events.go index a28fc986..6880989b 100644 --- a/internal/fdv2proto/events.go +++ b/internal/fdv2proto/events.go @@ -1,9 +1,8 @@ package fdv2proto import ( + "encoding/json" "errors" - "fmt" - "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" ) @@ -63,56 +62,44 @@ const ( SegmentKind = ObjectKind("segment") ) -// ErrUnknownKind represents that a given ObjectKind had no FDv1 equivalent -// DataKind. -type ErrUnknownKind struct { - kind ObjectKind -} - -// Is returns true if the error is an ErrUnknownKind. -func (e *ErrUnknownKind) Is(err error) bool { - var errUnknownKind *ErrUnknownKind - ok := errors.As(err, &errUnknownKind) - return ok -} - -func (e *ErrUnknownKind) Error() string { - return fmt.Sprintf("unknown object kind: %s", e.kind) -} - // ToFDV1 converts the object kind to an FDv1 data kind. If there is no equivalent, it returns // an ErrUnknownKind. -func (o ObjectKind) ToFDV1() (datakinds.DataKindInternal, error) { +func (o ObjectKind) ToFDV1() (datakinds.DataKindInternal, bool) { switch o { case FlagKind: - return datakinds.Features, nil + return datakinds.Features, true case SegmentKind: - return datakinds.Segments, nil + return datakinds.Segments, true default: - return nil, &ErrUnknownKind{o} + return nil, false } } // ServerIntent represents the server's intent. type ServerIntent struct { - // Payloads is a list of payloads, defined to be at least length 1. - Payloads []Payload `json:"payloads"` -} - -//nolint:revive // Event method. -func (ServerIntent) Name() EventName { - return EventServerIntent + Payload Payload } -// PayloadTransferred represents the fact that all payload objects have been sent. -type PayloadTransferred struct { - State string `json:"state"` - Version int `json:"version"` +func (s *ServerIntent) UnmarshalJSON(data []byte) error { + // Actual protocol object contains a list of payloads, but currently SDKs only support 1. It is a protocol + // error for this list to be empty. + type serverIntent struct { + Payloads []Payload `json:"payloads"` + } + var intent serverIntent + if err := json.Unmarshal(data, &intent); err != nil { + return err + } + if len(intent.Payloads) == 0 { + return errors.New("changeset: server-intent event has no payloads") + } + s.Payload = intent.Payloads[0] + return nil } //nolint:revive // Event method. -func (p PayloadTransferred) Name() EventName { - return EventPayloadTransferred +func (ServerIntent) Name() EventName { + return EventServerIntent } // DeleteObject specifies the deletion of a particular object. @@ -122,6 +109,10 @@ type DeleteObject struct { Key string `json:"key"` } +func (d DeleteObject) Delta() json.RawMessage { + return nil +} + //nolint:revive // Event method. func (d DeleteObject) Name() EventName { return EventDeleteObject @@ -129,10 +120,10 @@ func (d DeleteObject) Name() EventName { // PutObject specifies the addition of a particular object with upsert semantics. type PutObject struct { - Version int `json:"version"` - Kind ObjectKind `json:"kind"` - Key string `json:"key"` - Object any `json:"object"` + Version int `json:"version"` + Kind ObjectKind `json:"kind"` + Key string `json:"key"` + Object json.RawMessage `json:"object"` } //nolint:revive // Event method. diff --git a/internal/fdv2proto/selector.go b/internal/fdv2proto/selector.go index 9e7878fe..cd9ded58 100644 --- a/internal/fdv2proto/selector.go +++ b/internal/fdv2proto/selector.go @@ -11,29 +11,27 @@ type Selector struct { version int } -// NoSelector returns a nil Selector, representing the lack of one. It is -// here only for readability at call sites. -func NoSelector() *Selector { - return nil +// NoSelector returns an empty Selector. +func NoSelector() Selector { + return Selector{} } -// NewSelector creates a new Selector from a state string and version. -func NewSelector(state string, version int) *Selector { - return &Selector{state: state, version: version} +func (s Selector) IsDefined() bool { + return s != NoSelector() } -// IsSet returns true if the Selector is not nil. -func (s *Selector) IsSet() bool { - return s != nil +// NewSelector creates a new Selector from a state string and version. +func NewSelector(state string, version int) Selector { + return Selector{state: state, version: version} } // State returns the state string of the Selector. This cannot be called if the Selector is nil. -func (s *Selector) State() string { +func (s Selector) State() string { return s.state } // Version returns the version of the Selector. This cannot be called if the Selector is nil. -func (s *Selector) Version() int { +func (s Selector) Version() int { return s.version } diff --git a/internal/sharedtest/mocks/mock_data_destination.go b/internal/sharedtest/mocks/mock_data_destination.go index faa01cb6..97bd6537 100644 --- a/internal/sharedtest/mocks/mock_data_destination.go +++ b/internal/sharedtest/mocks/mock_data_destination.go @@ -44,11 +44,14 @@ func NewMockDataDestination(realStore subsystems.DataStore) *MockDataDestination } // SetBasis in this test implementation, delegates to d.DataStore.CapturedUpdates. -func (d *MockDataDestination) SetBasis(events []fdv2proto.Event, _ *fdv2proto.Selector, _ bool) { +func (d *MockDataDestination) SetBasis(events []fdv2proto.Change, _ fdv2proto.Selector, _ bool) { // For now, the selector is ignored. When the data sources start making use of it, it should be // stored so that assertions can be made. - collections := fdv2proto.ToStorableItems(events) + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + panic("MockDataDestination.SetBasis received malformed data: " + err.Error()) + } for _, coll := range collections { AssertNotNil(coll.Kind) @@ -57,11 +60,14 @@ func (d *MockDataDestination) SetBasis(events []fdv2proto.Event, _ *fdv2proto.Se } // ApplyDelta in this test implementation, delegates to d.DataStore.CapturedUpdates. -func (d *MockDataDestination) ApplyDelta(events []fdv2proto.Event, _ *fdv2proto.Selector, _ bool) { +func (d *MockDataDestination) ApplyDelta(events []fdv2proto.Change, _ fdv2proto.Selector, _ bool) { // For now, the selector is ignored. When the data sources start making use of it, it should be // stored so that assertions can be made. - collections := fdv2proto.ToStorableItems(events) + collections, err := fdv2proto.ToStorableItems(events) + if err != nil { + panic("MockDataDestination.ApplyDelta received malformed data: " + err.Error()) + } for _, coll := range collections { AssertNotNil(coll.Kind) diff --git a/subsystems/data_destination.go b/subsystems/data_destination.go index b74e3305..1dc29f07 100644 --- a/subsystems/data_destination.go +++ b/subsystems/data_destination.go @@ -19,7 +19,7 @@ type DataDestination interface { // // If persist is true, it indicates that the data should be propagated to any connected persistent // store. - SetBasis(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) + SetBasis(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) // ApplyDelta applies a set of changes to an existing basis. This operation should be atomic with // respect to any other operations that modify the store. @@ -28,5 +28,5 @@ type DataDestination interface { // // If persist is true, it indicates that the changes should be propagated to any connected persistent // store. - ApplyDelta(events []fdv2proto.Event, selector *fdv2proto.Selector, persist bool) + ApplyDelta(events []fdv2proto.Change, selector fdv2proto.Selector, persist bool) } diff --git a/subsystems/ldstoreimpl/big_segment_store_wrapper.go b/subsystems/ldstoreimpl/big_segment_store_wrapper.go index 0ba9f567..2502be96 100644 --- a/subsystems/ldstoreimpl/big_segment_store_wrapper.go +++ b/subsystems/ldstoreimpl/big_segment_store_wrapper.go @@ -53,7 +53,7 @@ type BigSegmentStoreWrapper struct { // is true. // // The BigSegmentStoreWrapper takes ownership of the BigSegmentStore's lifecycle at this point, so -// calling Close on the BigSegmentStoreWrapper will also close the store. +// calling Finish on the BigSegmentStoreWrapper will also close the store. // // If not nil, statusUpdateFn will be called whenever the store status has changed. func NewBigSegmentStoreWrapperWithConfig( diff --git a/testhelpers/ldservices/streaming_service.go b/testhelpers/ldservices/streaming_service.go index 7e1d7e36..81e09cf5 100644 --- a/testhelpers/ldservices/streaming_service.go +++ b/testhelpers/ldservices/streaming_service.go @@ -21,7 +21,7 @@ const ( // handler, stream := ldservices.ServerSideStreamingHandler(initialData.ToPutEvent()) // server := httptest.NewServer(handler) // stream.Enqueue(httphelpers.SSEEvent{Event: "patch", Data: myPatchData}) // push an update -// stream.Close() // force any current stream connections to be closed +// stream.Finish() // force any current stream connections to be closed func ServerSideStreamingServiceHandler( initialEvent httphelpers.SSEEvent, ) (http.Handler, httphelpers.SSEStreamControl) { From f6c6530b42ae9eb14cf7eb44ea24d2a45fac10a2 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Mon, 14 Oct 2024 13:25:06 -0700 Subject: [PATCH 2/6] cleaning up changeset API --- internal/datasourcev2/polling_data_source.go | 2 +- .../datasourcev2/streaming_data_source.go | 3 +- internal/fdv2proto/changeset.go | 14 ++++----- internal/fdv2proto/changeset_test.go | 30 +++++++------------ 4 files changed, 19 insertions(+), 30 deletions(-) diff --git a/internal/datasourcev2/polling_data_source.go b/internal/datasourcev2/polling_data_source.go index 30f79239..e90fae3d 100644 --- a/internal/datasourcev2/polling_data_source.go +++ b/internal/datasourcev2/polling_data_source.go @@ -148,7 +148,7 @@ func (pp *PollingProcessor) poll() error { return err } - payload := changeSet.Intent().Payloads[0] + payload := changeSet.IntentCode().Payload switch payload.Code { case fdv2proto.IntentTransferFull: pp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true) diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index b3d64926..e5fc21bb 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -258,8 +258,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< break } - // The Finish method guarantees that at least one payload is present. - payload := changeSet.Intent().Payloads[0] + payload := changeSet.IntentCode().Payload switch payload.Code { case fdv2proto.IntentTransferFull: sp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true) diff --git a/internal/fdv2proto/changeset.go b/internal/fdv2proto/changeset.go index eb5c72cf..9f519255 100644 --- a/internal/fdv2proto/changeset.go +++ b/internal/fdv2proto/changeset.go @@ -21,13 +21,13 @@ type Change struct { } type ChangeSet struct { - intent ServerIntent + intentCode IntentCode changes []Change selector Selector } -func (c *ChangeSet) Intent() ServerIntent { - return c.intent +func (c *ChangeSet) IntentCode() IntentCode{ + return c.intentCode } func (c *ChangeSet) Changes() []Change { @@ -49,16 +49,13 @@ func NewChangeSetBuilder() *ChangeSetBuilder { func (c *ChangeSetBuilder) NoChanges() *ChangeSet { return &ChangeSet{ - intent: ServerIntent{Payloads: []Payload{{Code: IntentNone}}}, + intentCode: IntentNone, selector: NoSelector(), changes: nil, } } func (c *ChangeSetBuilder) Start(intent ServerIntent) error { - if len(intent.Payloads) == 0 { - return errors.New("changeset: server-intent event has no payloads") - } c.intent = &intent c.changes = nil return nil @@ -71,11 +68,12 @@ func (c *ChangeSetBuilder) Finish(selector Selector) (*ChangeSet, error) { return nil, errors.New("changeset: cannot complete without a server-intent") } changes := &ChangeSet{ - intent: *c.intent, + intentCode: c.intent.Payload.Code, selector: selector, changes: c.changes, } c.changes = nil + c.intent.Payload.Code = return changes, nil } diff --git a/internal/fdv2proto/changeset_test.go b/internal/fdv2proto/changeset_test.go index 0426c07f..794198bd 100644 --- a/internal/fdv2proto/changeset_test.go +++ b/internal/fdv2proto/changeset_test.go @@ -16,27 +16,15 @@ func TestChangeSetBuilder_MustStartToFinish(t *testing.T) { _, err := builder.Finish(selector) assert.Error(t, err) - assert.NoError(t, builder.Start(ServerIntent{Payloads: []Payload{{Code: IntentNone}}})) + assert.NoError(t, builder.Start(ServerIntent{Payload: Payload{Code: IntentNone}})) _, err = builder.Finish(selector) assert.NoError(t, err) } -func TestChangeSetBuilder_MustHaveAtLeastOnePayload(t *testing.T) { - builder := NewChangeSetBuilder() - err := builder.Start(ServerIntent{}) - assert.Error(t, err) - - err = builder.Start(ServerIntent{Payloads: []Payload{{Code: IntentNone}}}) - assert.NoError(t, err) - - err = builder.Start(ServerIntent{Payloads: []Payload{{Code: IntentNone}, {Code: IntentNone}, {Code: IntentNone}}}) - assert.NoError(t, err) -} - func TestChangeSetBuilder_Changes(t *testing.T) { builder := NewChangeSetBuilder() - err := builder.Start(ServerIntent{Payloads: []Payload{{Code: IntentTransferChanges}}}) + err := builder.Start(ServerIntent{Payload: Payload{Code: IntentTransferChanges}}) assert.NoError(t, err) builder.AddPut("foo", "bar", 1, []byte("baz")) @@ -52,12 +40,17 @@ func TestChangeSetBuilder_Changes(t *testing.T) { assert.Equal(t, Change{Action: ChangeTypePut, Kind: "foo", Key: "bar", Version: 1, Object: []byte("baz")}, changes[0]) assert.Equal(t, Change{Action: ChangeTypeDelete, Kind: "foo", Key: "bar", Version: 1}, changes[1]) - assert.Equal(t, IntentTransferChanges, changeSet.Intent().Payloads[0].Code) + assert.Equal(t, IntentTransferChanges, changeSet.IntentCode().Payload.Code) assert.Equal(t, selector, changeSet.Selector()) } -func TestChangeSetBuilder_ImplicitXferChanges(t *testing.T) { +// After receiving an intent, the SDK may receive 1 or more objects before receiving a payload-transferred. +// At that point, LaunchDarkly may send more objects followed by another payload-transferred. These objects +// should be regarded as part of an implicit "xfer-changes" intent, even though the server doesn't actually send one. +// If the server intends to use an xfer-full instead (for efficiency or other reasons), it will need to explicitly +// send one. +func TestChangeSetBuilder_ImplicitIntentXferChanges(t *testing.T) { } @@ -66,11 +59,10 @@ func TestChangeSetBuilder_NoChanges(t *testing.T) { changeSet := builder.NoChanges() assert.NotNil(t, changeSet) - intent := changeSet.Intent() + intent := changeSet.IntentCode() assert.NotNil(t, intent) - assert.NotEmpty(t, intent.Payloads) - assert.Equal(t, IntentNone, intent.Payloads[0].Code) + assert.Equal(t, IntentNone, intent.Payload.Code) assert.False(t, changeSet.Selector().IsDefined()) assert.Equal(t, NoSelector(), changeSet.Selector()) From fd536f3ab91936333d9e6cc2b879beb51125bb0a Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Thu, 21 Nov 2024 13:08:58 -0800 Subject: [PATCH 3/6] undo some accidental renames from Close to Finish --- interfaces/client_interface.go | 2 +- internal/broadcasters.go | 2 +- internal/datasource/streaming_data_source.go | 2 +- internal/datasourcev2/streaming_data_source.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/interfaces/client_interface.go b/interfaces/client_interface.go index d244d54c..34473b0d 100644 --- a/interfaces/client_interface.go +++ b/interfaces/client_interface.go @@ -195,7 +195,7 @@ type LDClientEvents interface { // LDClientInterface defines the basic SDK client operations implemented by LDClient. // // This includes all methods for evaluating a feature flag or generating analytics events, as defined by -// LDEvaluations and LDEvents. It does not include general control operations like Flush(), Finish(), or +// LDEvaluations and LDEvents. It does not include general control operations like Flush(), Close(), or // GetDataSourceStatusProvider(). type LDClientInterface interface { LDClientEvaluations diff --git a/internal/broadcasters.go b/internal/broadcasters.go index e8c6df69..140ce8de 100644 --- a/internal/broadcasters.go +++ b/internal/broadcasters.go @@ -10,7 +10,7 @@ import ( // // The standard pattern is that AddListener returns a new receive-only channel; RemoveListener unsubscribes // that channel, and closes the sending end of it; Broadcast sends a value to all of the subscribed channels -// (if any); and Finish unsubscribes and closes all existing channels. +// (if any); and Close unsubscribes and closes all existing channels. // Arbitrary buffer size to make it less likely that we'll block when broadcasting to channels. It is still // the consumer's responsibility to make sure they're reading the channel. diff --git a/internal/datasource/streaming_data_source.go b/internal/datasource/streaming_data_source.go index 24c82732..b0b57a50 100644 --- a/internal/datasource/streaming_data_source.go +++ b/internal/datasource/streaming_data_source.go @@ -144,7 +144,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< if !ok { // COVERAGE: stream.Events is only closed if the EventSource has been closed. However, that // only happens when we have received from sp.halt, in which case we return immediately - // after calling stream.Finish(), terminating the for loop-- so we should not actually reach + // after calling stream.Close(), terminating the for loop-- so we should not actually reach // this point. Still, in case the channel is somehow closed unexpectedly, we do want to // terminate the loop. return diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index 8e8788e0..9fa17af1 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -153,7 +153,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< if !ok { // COVERAGE: stream.Events is only closed if the EventSource has been closed. However, that // only happens when we have received from sp.halt, in which case we return immediately - // after calling stream.Finish(), terminating the for loop-- so we should not actually reach + // after calling stream.Close(), terminating the for loop-- so we should not actually reach // this point. Still, in case the channel is somehow closed unexpectedly, we do want to // terminate the loop. return From 7c0609cd1c1216e9e3f1be0b5ac6d3eb442e3c44 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Thu, 21 Nov 2024 13:18:32 -0800 Subject: [PATCH 4/6] goimports gofmt --- internal/datasourcev2/streaming_data_source.go | 1 + internal/datasystem/store_test.go | 5 +++-- internal/fdv2proto/changeset_test.go | 3 ++- internal/fdv2proto/events.go | 1 + internal/fdv2proto/selector.go | 4 ++++ ldclient_evaluation_all_flags_test.go | 3 ++- testhelpers/ldservicesv2/server_sdk_data.go | 12 ++++++++++-- .../ldservicesv2/streaming_protocol_builder.go | 2 +- 8 files changed, 24 insertions(+), 7 deletions(-) diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index 9fa17af1..65ef19ce 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -11,6 +11,7 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" "context" + es "github.com/launchdarkly/eventsource" "github.com/launchdarkly/go-sdk-common/v3/ldlog" diff --git a/internal/datasystem/store_test.go b/internal/datasystem/store_test.go index 7dc074fb..5d6e76fe 100644 --- a/internal/datasystem/store_test.go +++ b/internal/datasystem/store_test.go @@ -4,13 +4,14 @@ import ( "encoding/json" "errors" "fmt" - "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldbuilders" - "github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest" "math/rand" "sync" "testing" "time" + "github.com/launchdarkly/go-server-sdk-evaluation/v3/ldbuilders" + "github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest" + "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" "github.com/launchdarkly/go-server-sdk/v7/subsystems" diff --git a/internal/fdv2proto/changeset_test.go b/internal/fdv2proto/changeset_test.go index 794198bd..19247c8b 100644 --- a/internal/fdv2proto/changeset_test.go +++ b/internal/fdv2proto/changeset_test.go @@ -1,8 +1,9 @@ package fdv2proto import ( - "github.com/stretchr/testify/assert" "testing" + + "github.com/stretchr/testify/assert" ) func TestChangeSetBuilder_New(t *testing.T) { diff --git a/internal/fdv2proto/events.go b/internal/fdv2proto/events.go index 6880989b..ae4fc7b1 100644 --- a/internal/fdv2proto/events.go +++ b/internal/fdv2proto/events.go @@ -3,6 +3,7 @@ package fdv2proto import ( "encoding/json" "errors" + "github.com/launchdarkly/go-server-sdk/v7/internal/datakinds" ) diff --git a/internal/fdv2proto/selector.go b/internal/fdv2proto/selector.go index cd9ded58..3eafc366 100644 --- a/internal/fdv2proto/selector.go +++ b/internal/fdv2proto/selector.go @@ -20,6 +20,10 @@ func (s Selector) IsDefined() bool { return s != NoSelector() } +func (s Selector) Name() EventName { + return EventPayloadTransferred +} + // NewSelector creates a new Selector from a state string and version. func NewSelector(state string, version int) Selector { return Selector{state: state, version: version} diff --git a/ldclient_evaluation_all_flags_test.go b/ldclient_evaluation_all_flags_test.go index 4ead0168..9858ae85 100644 --- a/ldclient_evaluation_all_flags_test.go +++ b/ldclient_evaluation_all_flags_test.go @@ -2,9 +2,10 @@ package ldclient import ( "errors" - "github.com/stretchr/testify/require" "testing" + "github.com/stretchr/testify/require" + "github.com/launchdarkly/go-server-sdk/v7/internal/sharedtest/mocks" "github.com/launchdarkly/go-sdk-common/v3/ldlog" diff --git a/testhelpers/ldservicesv2/server_sdk_data.go b/testhelpers/ldservicesv2/server_sdk_data.go index 9b0b4578..8cd2a42a 100644 --- a/testhelpers/ldservicesv2/server_sdk_data.go +++ b/testhelpers/ldservicesv2/server_sdk_data.go @@ -59,6 +59,14 @@ func (s *ServerSDKData) Segments(segments ...ldmodel.Segment) *ServerSDKData { return s } +func MustMarshal(model any) json.RawMessage { + data, err := json.Marshal(model) + if err != nil { + panic(err) + } + return data +} + // ToPutObjects converts the data to a list of PutObject objects that can be fed to a mock streaming data source. func (s *ServerSDKData) ToPutObjects() []fdv2proto.PutObject { var objs []fdv2proto.PutObject @@ -67,7 +75,7 @@ func (s *ServerSDKData) ToPutObjects() []fdv2proto.PutObject { Version: flag.Version, Kind: fdv2proto.FlagKind, Key: flag.Key, - Object: flag, + Object: MustMarshal(flag), } objs = append(objs, base) } @@ -76,7 +84,7 @@ func (s *ServerSDKData) ToPutObjects() []fdv2proto.PutObject { Version: segment.Version, Kind: fdv2proto.SegmentKind, Key: segment.Key, - Object: segment, + Object: MustMarshal(segment), } objs = append(objs, base) } diff --git a/testhelpers/ldservicesv2/streaming_protocol_builder.go b/testhelpers/ldservicesv2/streaming_protocol_builder.go index 1c6e1e98..9c4e6932 100644 --- a/testhelpers/ldservicesv2/streaming_protocol_builder.go +++ b/testhelpers/ldservicesv2/streaming_protocol_builder.go @@ -40,7 +40,7 @@ func (f *StreamingProtocol) WithPutObject(object fdv2proto.PutObject) *Streaming // WithTransferred adds a PayloadTransferred event to the protocol with a given version. The state is a a placeholder // string. func (f *StreamingProtocol) WithTransferred(version int) *StreamingProtocol { - return f.pushEvent(fdv2proto.PayloadTransferred{State: "[p:17YNC7XBH88Y6RDJJ48EKPCJS7:53]", Version: version}) + return f.pushEvent(fdv2proto.NewSelector("[p:17YNC7XBH88Y6RDJJ48EKPCJS7:53]", version)) } // WithPutObjects adds multiple PutObject events to the protocol. From 4126571fc6330997cbbe3fb15a635272bdc65131 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Fri, 22 Nov 2024 14:42:46 -0800 Subject: [PATCH 5/6] lint errors --- internal/datasourcev2/streaming_data_source.go | 6 ------ internal/fdv2proto/changeset.go | 13 +++++++++++++ internal/fdv2proto/selector.go | 2 ++ testhelpers/ldservicesv2/server_sdk_data.go | 5 ----- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/internal/datasourcev2/streaming_data_source.go b/internal/datasourcev2/streaming_data_source.go index 65ef19ce..4e63c06b 100644 --- a/internal/datasourcev2/streaming_data_source.go +++ b/internal/datasourcev2/streaming_data_source.go @@ -27,11 +27,6 @@ import ( ) const ( - keyField = "key" - kindField = "kind" - versionField = "version" - objectField = "object" - streamReadTimeout = 5 * time.Minute // the LaunchDarkly stream should send a heartbeat comment every 3 minutes streamMaxRetryDelay = 30 * time.Second streamRetryResetInterval = 60 * time.Second @@ -169,7 +164,6 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan< shouldRestart := false gotMalformedEvent := func(event es.Event, err error) { - // The protocol should "forget" anything that happens upon receiving an error. changeSetBuilder = fdv2proto.NewChangeSetBuilder() diff --git a/internal/fdv2proto/changeset.go b/internal/fdv2proto/changeset.go index a7056eeb..55972794 100644 --- a/internal/fdv2proto/changeset.go +++ b/internal/fdv2proto/changeset.go @@ -12,6 +12,7 @@ const ( ChangeTypeDelete = ChangeType("delete") ) +// Change represents a change to a piece of data, such as an update or deletion. type Change struct { Action ChangeType Kind ObjectKind @@ -20,33 +21,42 @@ type Change struct { Object json.RawMessage } +// ChangeSet represents a list of changes to be applied. type ChangeSet struct { intentCode IntentCode changes []Change selector Selector } +// IntentCode represents the intent of the changeset. func (c *ChangeSet) IntentCode() IntentCode { return c.intentCode } +// Changes returns the individual changes that should be applied according +// to the intent. func (c *ChangeSet) Changes() []Change { return c.changes } +// Selector identifies the version of the changes. func (c *ChangeSet) Selector() Selector { return c.selector } +// ChangeSetBuilder is a helper for constructing a ChangeSet. type ChangeSetBuilder struct { intent *ServerIntent changes []Change } +// NewChangeSetBuilder creates a new ChangeSetBuilder, which is empty by default. func NewChangeSetBuilder() *ChangeSetBuilder { return &ChangeSetBuilder{} } +// NoChanges represents an intent that the current data is up-to-date and doesn't +// require changes. func (c *ChangeSetBuilder) NoChanges() *ChangeSet { return &ChangeSet{ intentCode: IntentNone, @@ -55,6 +65,7 @@ func (c *ChangeSetBuilder) NoChanges() *ChangeSet { } } +// Start begins a new change set with a given intent. func (c *ChangeSetBuilder) Start(intent ServerIntent) error { c.intent = &intent c.changes = nil @@ -82,6 +93,7 @@ func (c *ChangeSetBuilder) Finish(selector Selector) (*ChangeSet, error) { return changes, nil } +// AddPut adds a new object to the changeset. func (c *ChangeSetBuilder) AddPut(kind ObjectKind, key string, version int, object json.RawMessage) { c.changes = append(c.changes, Change{ Action: ChangeTypePut, @@ -92,6 +104,7 @@ func (c *ChangeSetBuilder) AddPut(kind ObjectKind, key string, version int, obje }) } +// AddDelete adds a deletion to the changeset. func (c *ChangeSetBuilder) AddDelete(kind ObjectKind, key string, version int) { c.changes = append(c.changes, Change{ Action: ChangeTypeDelete, diff --git a/internal/fdv2proto/selector.go b/internal/fdv2proto/selector.go index 3eafc366..0ba5809b 100644 --- a/internal/fdv2proto/selector.go +++ b/internal/fdv2proto/selector.go @@ -16,10 +16,12 @@ func NoSelector() Selector { return Selector{} } +// IsDefined returns true if the Selector has a value. func (s Selector) IsDefined() bool { return s != NoSelector() } +// Name returns the name of the event. func (s Selector) Name() EventName { return EventPayloadTransferred } diff --git a/testhelpers/ldservicesv2/server_sdk_data.go b/testhelpers/ldservicesv2/server_sdk_data.go index 8cd2a42a..081d65af 100644 --- a/testhelpers/ldservicesv2/server_sdk_data.go +++ b/testhelpers/ldservicesv2/server_sdk_data.go @@ -7,11 +7,6 @@ import ( "github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto" ) -type fakeVersionedKind struct { - Key string `json:"key"` - Version int `json:"version"` -} - // ServerSDKData is a convenience type for constructing a test server-side SDK data payload for // PollingServiceHandler or StreamingServiceHandler. Its String() method returns a JSON object with // the expected "flags" and "segments" properties. From ec64d947673587811721e4af31e390c2531067e7 Mon Sep 17 00:00:00 2001 From: Casey Waldren Date: Fri, 22 Nov 2024 14:48:20 -0800 Subject: [PATCH 6/6] more lints --- internal/datasystem/fdv2_datasystem.go | 4 ++-- internal/fdv2proto/changeset.go | 6 +++++- internal/fdv2proto/events.go | 2 ++ ldcomponents/data_system_configuration_builder.go | 2 ++ testhelpers/ldservicesv2/server_sdk_data.go | 8 ++++---- 5 files changed, 15 insertions(+), 7 deletions(-) diff --git a/internal/datasystem/fdv2_datasystem.go b/internal/datasystem/fdv2_datasystem.go index e78b313b..0c77e5c7 100644 --- a/internal/datasystem/fdv2_datasystem.go +++ b/internal/datasystem/fdv2_datasystem.go @@ -69,8 +69,8 @@ type FDv2 struct { status interfaces.DataSourceStatus } -func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems.DataSystemConfiguration], clientContext *internal.ClientContextImpl) (*FDv2, error) { - +func NewFDv2(disabled bool, cfgBuilder subsystems.ComponentConfigurer[subsystems.DataSystemConfiguration], + clientContext *internal.ClientContextImpl) (*FDv2, error) { store := NewStore(clientContext.GetLogging().Loggers) bcasters := &broadcasters{ diff --git a/internal/fdv2proto/changeset.go b/internal/fdv2proto/changeset.go index 55972794..40afd5ea 100644 --- a/internal/fdv2proto/changeset.go +++ b/internal/fdv2proto/changeset.go @@ -5,10 +5,14 @@ import ( "errors" ) +// ChangeType specifies if an object is being upserted or deleted. type ChangeType string const ( - ChangeTypePut = ChangeType("put") + // ChangeTypePut represents an object being upserted. + ChangeTypePut = ChangeType("put") + + // ChangeTypeDelete represents an object being deleted. ChangeTypeDelete = ChangeType("delete") ) diff --git a/internal/fdv2proto/events.go b/internal/fdv2proto/events.go index ae4fc7b1..8d4e76d3 100644 --- a/internal/fdv2proto/events.go +++ b/internal/fdv2proto/events.go @@ -81,6 +81,8 @@ type ServerIntent struct { Payload Payload } +// UnmarshalJSON unmarshals a ServerIntent from JSON. The intent is required to have at least +// one payload (at index 0) at this time. func (s *ServerIntent) UnmarshalJSON(data []byte) error { // Actual protocol object contains a list of payloads, but currently SDKs only support 1. It is a protocol // error for this list to be empty. diff --git a/ldcomponents/data_system_configuration_builder.go b/ldcomponents/data_system_configuration_builder.go index e45d7117..63c40626 100644 --- a/ldcomponents/data_system_configuration_builder.go +++ b/ldcomponents/data_system_configuration_builder.go @@ -16,6 +16,8 @@ type DataSystemConfigurationBuilder struct { config ss.DataSystemConfiguration } +// DataSystemModes provides access to high level strategies for fetching data. The default mode +// is suitable for most use-cases. type DataSystemModes struct{} // Default is LaunchDarkly's recommended flag data acquisition strategy. Currently, it operates a diff --git a/testhelpers/ldservicesv2/server_sdk_data.go b/testhelpers/ldservicesv2/server_sdk_data.go index 081d65af..defe26a9 100644 --- a/testhelpers/ldservicesv2/server_sdk_data.go +++ b/testhelpers/ldservicesv2/server_sdk_data.go @@ -54,7 +54,7 @@ func (s *ServerSDKData) Segments(segments ...ldmodel.Segment) *ServerSDKData { return s } -func MustMarshal(model any) json.RawMessage { +func mustMarshal(model any) json.RawMessage { data, err := json.Marshal(model) if err != nil { panic(err) @@ -64,13 +64,13 @@ func MustMarshal(model any) json.RawMessage { // ToPutObjects converts the data to a list of PutObject objects that can be fed to a mock streaming data source. func (s *ServerSDKData) ToPutObjects() []fdv2proto.PutObject { - var objs []fdv2proto.PutObject + objs := make([]fdv2proto.PutObject, 0, len(s.FlagsMap)+len(s.SegmentsMap)) for _, flag := range s.FlagsMap { base := fdv2proto.PutObject{ Version: flag.Version, Kind: fdv2proto.FlagKind, Key: flag.Key, - Object: MustMarshal(flag), + Object: mustMarshal(flag), } objs = append(objs, base) } @@ -79,7 +79,7 @@ func (s *ServerSDKData) ToPutObjects() []fdv2proto.PutObject { Version: segment.Version, Kind: fdv2proto.SegmentKind, Key: segment.Key, - Object: MustMarshal(segment), + Object: mustMarshal(segment), } objs = append(objs, base) }