Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: clean up fdv2 parsing abstractions #200

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
// PollingRequester allows PollingProcessor to delegate fetching data to another component.
// This is useful for testing the PollingProcessor without needing to set up a test HTTP server.
type PollingRequester interface {
Request() (*PollingResponse, error)
Request() (*fdv2proto.ChangeSet, error)
BaseURI() string
FilterKey() string
}
Expand Down Expand Up @@ -86,11 +86,11 @@ func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error)
if err != nil {
return nil, err
}
return &subsystems.Basis{Events: basis.Events(), Selector: basis.Selector(), Persist: true}, nil
return &subsystems.Basis{Events: basis.Changes(), Selector: basis.Selector(), Persist: true}, nil
}

//nolint:revive // DataSynchronizer method.
func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ *fdv2proto.Selector) {
func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ fdv2proto.Selector) {
pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval)

ticker := newTickerWithInitialTick(pp.pollInterval)
Expand Down Expand Up @@ -159,21 +159,22 @@ func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ *fdv2proto.Se
}

func (pp *PollingProcessor) poll() error {
response, err := pp.requester.Request()
changeSet, err := pp.requester.Request()

if err != nil {
return err
}

if response.Cached() {
return nil
}

switch response.Intent() {
code := changeSet.IntentCode()
switch code {
case fdv2proto.IntentTransferFull:
pp.dataDestination.SetBasis(response.Events(), response.Selector(), true)
pp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true)
case fdv2proto.IntentTransferChanges:
pp.dataDestination.ApplyDelta(response.Events(), response.Selector(), true)
pp.dataDestination.ApplyDelta(changeSet.Changes(), changeSet.Selector(), true)
case fdv2proto.IntentNone:
{
// no-op, we are already up-to-date.
}
}

return nil
Expand Down
128 changes: 24 additions & 104 deletions internal/datasourcev2/polling_http_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,17 @@ package datasourcev2

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"

"github.com/launchdarkly/go-jsonstream/v3/jreader"
"github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

"github.com/gregjones/httpcache"
"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/internal/endpoints"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"

"github.com/gregjones/httpcache"
"golang.org/x/exp/maps"
)

Expand Down Expand Up @@ -70,7 +66,7 @@ func (r *pollingRequester) FilterKey() string {
return r.filterKey
}

func (r *pollingRequester) Request() (*PollingResponse, error) {
func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) {
if r.loggers.IsDebugEnabled() {
r.loggers.Debug("Polling LaunchDarkly for feature flag updates")
}
Expand All @@ -80,125 +76,49 @@ func (r *pollingRequester) Request() (*PollingResponse, error) {
return nil, err
}
if cached {
return NewCachedPollingResponse(), nil
return fdv2proto.NewChangeSetBuilder().NoChanges(), nil
}

var payload fdv2proto.PollingPayload
payloadSelector := fdv2proto.NoSelector()
if err = json.Unmarshal(body, &payload); err != nil {
return nil, malformedJSONError{err}
}

parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) {
dataKind, err := kind.ToFDV1()
if err != nil {
return ldstoretypes.ItemDescriptor{}, err
}
item, err := dataKind.DeserializeFromJSONReader(&r)
return item, err
}

updates := make([]fdv2proto.Event, 0, len(payload.Events))

var intentCode fdv2proto.IntentCode
changeSet := fdv2proto.NewChangeSetBuilder()

for _, event := range payload.Events {
switch event.Name {
case fdv2proto.EventServerIntent:
{
var serverIntent fdv2proto.ServerIntent
err := json.Unmarshal(event.Data, &serverIntent)
if err != nil {
return nil, err
} else if len(serverIntent.Payloads) == 0 {
return nil, errors.New("server-intent event has no payloads")
}

intentCode = serverIntent.Payloads[0].Code
if intentCode == fdv2proto.IntentNone {
return NewCachedPollingResponse(), nil
}
var serverIntent fdv2proto.ServerIntent
err := json.Unmarshal(event.Data, &serverIntent)
if err != nil {
return nil, err
}
if err := changeSet.Start(serverIntent); err != nil {
return nil, err
}
case fdv2proto.EventPutObject:
{
r := jreader.NewReader(event.Data)

var (
key string
kind fdv2proto.ObjectKind
item ldstoretypes.ItemDescriptor
err error
version int
)

for obj := r.Object().WithRequiredProperties([]string{
versionField, kindField, keyField, objectField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case kindField:
kind = fdv2proto.ObjectKind(r.String())
case keyField:
key = r.String()
case objectField:
item, err = parseItem(r, kind)
if err != nil {
return nil, err
}
}
}
updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item.Item, Version: version})
var put fdv2proto.PutObject
if err := json.Unmarshal(event.Data, &put); err != nil {
return nil, err
}
changeSet.AddPut(put.Kind, put.Key, put.Version, put.Object)
case fdv2proto.EventDeleteObject:
{
r := jreader.NewReader(event.Data)

var (
version int
kind fdv2proto.ObjectKind
key string
)

for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case kindField:
kind = fdv2proto.ObjectKind(r.String())
//nolint:godox
// TODO: An unrecognized kind should be ignored for forwards compat; the question is,
// do we throw out the DeleteObject here, or let the SDK's store handle it?
case keyField:
key = r.String()
}
}
updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version})
var deleteObject fdv2proto.DeleteObject
if err := json.Unmarshal(event.Data, &deleteObject); err != nil {
return nil, err
}
changeSet.AddDelete(deleteObject.Kind, deleteObject.Key, deleteObject.Version)
case fdv2proto.EventPayloadTransferred:
r := jreader.NewReader(event.Data)

var (
state string
version int
)

for obj := r.Object().WithRequiredProperties([]string{stateField, versionField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case stateField:
state = r.String()
}
var selector fdv2proto.Selector
if err := json.Unmarshal(event.Data, &selector); err != nil {
return nil, err
}
payloadSelector = fdv2proto.NewSelector(state, version)
return changeSet.Finish(selector)
}
}

if intentCode == "" {
return nil, errors.New("no server-intent event found in polling response")
}

return NewPollingResponse(intentCode, updates, payloadSelector), nil
return nil, fmt.Errorf("malformed protocol")
}

func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) {
Expand Down
48 changes: 0 additions & 48 deletions internal/datasourcev2/polling_response.go

This file was deleted.

Loading