Skip to content

Commit

Permalink
refactor: clean up fdv2 parsing abstractions
Browse files Browse the repository at this point in the history
  • Loading branch information
cwaldren-ld committed Oct 11, 2024
1 parent ece9cab commit 72ddf8b
Show file tree
Hide file tree
Showing 19 changed files with 430 additions and 422 deletions.
2 changes: 1 addition & 1 deletion interfaces/client_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ type LDClientEvents interface {
// LDClientInterface defines the basic SDK client operations implemented by LDClient.
//
// This includes all methods for evaluating a feature flag or generating analytics events, as defined by
// LDEvaluations and LDEvents. It does not include general control operations like Flush(), Close(), or
// LDEvaluations and LDEvents. It does not include general control operations like Flush(), Finish(), or
// GetDataSourceStatusProvider().
type LDClientInterface interface {
LDClientEvaluations
Expand Down
2 changes: 1 addition & 1 deletion internal/broadcasters.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
//
// The standard pattern is that AddListener returns a new receive-only channel; RemoveListener unsubscribes
// that channel, and closes the sending end of it; Broadcast sends a value to all of the subscribed channels
// (if any); and Close unsubscribes and closes all existing channels.
// (if any); and Finish unsubscribes and closes all existing channels.

// Arbitrary buffer size to make it less likely that we'll block when broadcasting to channels. It is still
// the consumer's responsibility to make sure they're reading the channel.
Expand Down
2 changes: 1 addition & 1 deletion internal/datasource/streaming_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
if !ok {
// COVERAGE: stream.Events is only closed if the EventSource has been closed. However, that
// only happens when we have received from sp.halt, in which case we return immediately
// after calling stream.Close(), terminating the for loop-- so we should not actually reach
// after calling stream.Finish(), terminating the for loop-- so we should not actually reach
// this point. Still, in case the channel is somehow closed unexpectedly, we do want to
// terminate the loop.
return
Expand Down
19 changes: 10 additions & 9 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
// PollingRequester allows PollingProcessor to delegate fetching data to another component.
// This is useful for testing the PollingProcessor without needing to set up a test HTTP server.
type PollingRequester interface {
Request() (*PollingResponse, error)
Request() (*fdv2proto.ChangeSet, error)
BaseURI() string
FilterKey() string
}
Expand Down Expand Up @@ -142,21 +142,22 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
}

func (pp *PollingProcessor) poll() error {
response, err := pp.requester.Request()
changeSet, err := pp.requester.Request()

if err != nil {
return err
}

if response.Cached() {
return nil
}

switch response.Intent() {
payload := changeSet.Intent().Payloads[0]
switch payload.Code {
case fdv2proto.IntentTransferFull:
pp.dataDestination.SetBasis(response.Events(), response.Selector(), true)
pp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true)
case fdv2proto.IntentTransferChanges:
pp.dataDestination.ApplyDelta(response.Events(), response.Selector(), true)
pp.dataDestination.ApplyDelta(changeSet.Changes(), changeSet.Selector(), true)
case fdv2proto.IntentNone:
{
// no-op, we are already up-to-date.
}
}

return nil
Expand Down
115 changes: 25 additions & 90 deletions internal/datasourcev2/polling_http_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,17 @@ package datasourcev2

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"

"github.com/launchdarkly/go-jsonstream/v3/jreader"
"github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

"github.com/gregjones/httpcache"
"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/internal/endpoints"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"

"github.com/gregjones/httpcache"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -70,7 +66,7 @@ func (r *pollingRequester) FilterKey() string {
return r.filterKey
}

func (r *pollingRequester) Request() (*PollingResponse, error) {
func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) {
if r.loggers.IsDebugEnabled() {
r.loggers.Debug("Polling LaunchDarkly for feature flag updates")
}
Expand All @@ -80,110 +76,49 @@ func (r *pollingRequester) Request() (*PollingResponse, error) {
return nil, err
}
if cached {
return NewCachedPollingResponse(), nil
return fdv2proto.NewChangeSetBuilder().NoChanges(), nil
}

var payload fdv2proto.PollingPayload
if err = json.Unmarshal(body, &payload); err != nil {
return nil, malformedJSONError{err}
}

parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) {
dataKind, err := kind.ToFDV1()
if err != nil {
return ldstoretypes.ItemDescriptor{}, err
}
item, err := dataKind.DeserializeFromJSONReader(&r)
return item, err
}

updates := make([]fdv2proto.Event, 0, len(payload.Events))

var intentCode fdv2proto.IntentCode
changeSet := fdv2proto.NewChangeSetBuilder()

for _, event := range payload.Events {
switch event.Name {
case fdv2proto.EventServerIntent:
{
var serverIntent fdv2proto.ServerIntent
err := json.Unmarshal(event.Data, &serverIntent)
if err != nil {
return nil, err
} else if len(serverIntent.Payloads) == 0 {
return nil, errors.New("server-intent event has no payloads")
}

intentCode = serverIntent.Payloads[0].Code
if intentCode == fdv2proto.IntentNone {
return NewCachedPollingResponse(), nil
}
var serverIntent fdv2proto.ServerIntent
err := json.Unmarshal(event.Data, &serverIntent)
if err != nil {
return nil, err
}
if err := changeSet.Start(serverIntent); err != nil {
return nil, err
}
case fdv2proto.EventPutObject:
{
r := jreader.NewReader(event.Data)

var (
key string
kind fdv2proto.ObjectKind
item ldstoretypes.ItemDescriptor
err error
version int
)

for obj := r.Object().WithRequiredProperties([]string{
versionField, kindField, keyField, objectField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case kindField:
kind = fdv2proto.ObjectKind(r.String())
case keyField:
key = r.String()
case objectField:
item, err = parseItem(r, kind)
if err != nil {
return nil, err
}
}
}
updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item, Version: version})
var put fdv2proto.PutObject
if err := json.Unmarshal(event.Data, &put); err != nil {
return nil, err
}
changeSet.AddPut(put.Kind, put.Key, put.Version, put.Object)
case fdv2proto.EventDeleteObject:
{
r := jreader.NewReader(event.Data)

var (
version int
kind fdv2proto.ObjectKind
key string
)

for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case kindField:
kind = fdv2proto.ObjectKind(r.String())
//nolint:godox
// TODO: An unrecognized kind should be ignored for forwards compat; the question is,
// do we throw out the DeleteObject here, or let the SDK's store handle it?
case keyField:
key = r.String()
}
}
updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version})
var deleteObject fdv2proto.DeleteObject
if err := json.Unmarshal(event.Data, &deleteObject); err != nil {
return nil, err
}
changeSet.AddDelete(deleteObject.Kind, deleteObject.Key, deleteObject.Version)
case fdv2proto.EventPayloadTransferred:
//nolint:godox
// TODO: deserialize the state and create a fdv2proto.Selector.
var selector fdv2proto.Selector
if err := json.Unmarshal(event.Data, &selector); err != nil {
return nil, err
}
return changeSet.Finish(selector)
}
}

if intentCode == "" {
return nil, errors.New("no server-intent event found in polling response")
}

return NewPollingResponse(intentCode, updates, fdv2proto.NoSelector()), nil
return nil, fmt.Errorf("malformed protocol")
}

func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) {
Expand Down
48 changes: 0 additions & 48 deletions internal/datasourcev2/polling_response.go

This file was deleted.

Loading

0 comments on commit 72ddf8b

Please sign in to comment.