Skip to content

Commit

Permalink
refactor: add internal data system config (#185)
Browse files Browse the repository at this point in the history
This PR represents a major internal refactoring of the FDv2 data system
components.

Along with better support for the FDv2 protocol, it adds support for
FDv2 data source contract tests as well.

Missing pieces include an updated Data Source Status monitoring API, as
well as support for dual synchronizers.

---------

Co-authored-by: Matthew M. Keeler <[email protected]>
  • Loading branch information
cwaldren-ld and keelerm84 authored Dec 6, 2024
1 parent e5d10e0 commit 67b411e
Show file tree
Hide file tree
Showing 32 changed files with 1,942 additions and 531 deletions.
20 changes: 20 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,24 @@ type Config struct {
// LaunchDarkly provides integration packages, and most applications will not
// need to implement their own hooks.
Hooks []ldhooks.Hook

// This field is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
// It is not suitable for production usage. Do not use it. You have been warned.
//
// DataSystem configures how data (e.g. flags, segments) are retrieved by the SDK.
//
// Set this field only if you want to specify non-default values for any of the data system configuration,
// such as defining an alternate data source or setting up a persistent store.
//
// Below, the default configuration is described with the relevant config item in parentheses:
// 1. The SDK will first attempt to fetch all data from LaunchDarkly's global Content Delivery Network (Initializer)
// 2. It will then establish a streaming connection with LaunchDarkly's realtime Flag Delivery Network (Primary
// Synchronizer.)
// 3. If at any point the connection to the realtime network is interrupted for a short period of time,
// the connection will be automatically re-established.
// 4. If the connection cannot be re-established over a sustained period, the SDK will begin to make periodic
// requests to LaunchDarkly's global CDN (Secondary Synchronizer)
// 5. After a period of time, the SDK will swap back to the realtime Flag Delivery Network if it becomes
// available again.
DataSystem subsystems.ComponentConfigurer[subsystems.DataSystemConfiguration]
}
40 changes: 29 additions & 11 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datasourcev2

import (
"context"
"sync"
"time"

Expand All @@ -21,7 +22,7 @@ const (
// PollingRequester allows PollingProcessor to delegate fetching data to another component.
// This is useful for testing the PollingProcessor without needing to set up a test HTTP server.
type PollingRequester interface {
Request() (*PollingResponse, error)
Request() (*fdv2proto.ChangeSet, error)
BaseURI() string
FilterKey() string
}
Expand Down Expand Up @@ -72,8 +73,24 @@ func newPollingProcessor(
return pp
}

//nolint:revive // no doc comment for standard method
func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
//nolint:revive // DataInitializer method.
func (pp *PollingProcessor) Name() string {
return "PollingDataSourceV2"
}

//nolint:revive // DataInitializer method.
func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error) {
//nolint:godox
// TODO(SDK-752): Plumb the context into the request method.
basis, err := pp.requester.Request()
if err != nil {
return nil, err
}
return &subsystems.Basis{Events: basis.Changes(), Selector: basis.Selector(), Persist: true}, nil
}

//nolint:revive // DataSynchronizer method.
func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ fdv2proto.Selector) {
pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval)

ticker := newTickerWithInitialTick(pp.pollInterval)
Expand Down Expand Up @@ -142,21 +159,22 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
}

func (pp *PollingProcessor) poll() error {
response, err := pp.requester.Request()
changeSet, err := pp.requester.Request()

if err != nil {
return err
}

if response.Cached() {
return nil
}

switch response.Intent() {
code := changeSet.IntentCode()
switch code {
case fdv2proto.IntentTransferFull:
pp.dataDestination.SetBasis(response.Events(), response.Selector(), true)
pp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true)
case fdv2proto.IntentTransferChanges:
pp.dataDestination.ApplyDelta(response.Events(), response.Selector(), true)
pp.dataDestination.ApplyDelta(changeSet.Changes(), changeSet.Selector(), true)
case fdv2proto.IntentNone:
{
// no-op, we are already up-to-date.
}
}

return nil
Expand Down
121 changes: 30 additions & 91 deletions internal/datasourcev2/polling_http_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@ package datasourcev2

import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"

"github.com/launchdarkly/go-jsonstream/v3/jreader"
"github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

"github.com/gregjones/httpcache"
"golang.org/x/exp/maps"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-server-sdk/v7/internal/endpoints"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"

"github.com/gregjones/httpcache"
"golang.org/x/exp/maps"
)

// pollingRequester is the internal implementation of getting flag/segment data from the LD polling endpoints.
Expand Down Expand Up @@ -70,7 +67,7 @@ func (r *pollingRequester) FilterKey() string {
return r.filterKey
}

func (r *pollingRequester) Request() (*PollingResponse, error) {
func (r *pollingRequester) Request() (*fdv2proto.ChangeSet, error) {
if r.loggers.IsDebugEnabled() {
r.loggers.Debug("Polling LaunchDarkly for feature flag updates")
}
Expand All @@ -80,110 +77,52 @@ func (r *pollingRequester) Request() (*PollingResponse, error) {
return nil, err
}
if cached {
return NewCachedPollingResponse(), nil
return fdv2proto.NewChangeSetBuilder().NoChanges(), nil
}

var payload fdv2proto.PollingPayload
if err = json.Unmarshal(body, &payload); err != nil {
return nil, malformedJSONError{err}
}

parseItem := func(r jreader.Reader, kind fdv2proto.ObjectKind) (ldstoretypes.ItemDescriptor, error) {
dataKind, err := kind.ToFDV1()
if err != nil {
return ldstoretypes.ItemDescriptor{}, err
}
item, err := dataKind.DeserializeFromJSONReader(&r)
return item, err
}

updates := make([]fdv2proto.Event, 0, len(payload.Events))

var intentCode fdv2proto.IntentCode
changeSet := fdv2proto.NewChangeSetBuilder()

for _, event := range payload.Events {
switch event.Name {
case fdv2proto.EventServerIntent:
{
var serverIntent fdv2proto.ServerIntent
err := json.Unmarshal(event.Data, &serverIntent)
if err != nil {
return nil, err
} else if len(serverIntent.Payloads) == 0 {
return nil, errors.New("server-intent event has no payloads")
}

intentCode = serverIntent.Payloads[0].Code
if intentCode == fdv2proto.IntentNone {
return NewCachedPollingResponse(), nil
}
var serverIntent fdv2proto.ServerIntent
err := json.Unmarshal(event.Data, &serverIntent)
if err != nil {
return nil, err
}
if serverIntent.Payload.Code == fdv2proto.IntentNone {
return changeSet.NoChanges(), nil
}
if err := changeSet.Start(serverIntent); err != nil {
return nil, err
}
case fdv2proto.EventPutObject:
{
r := jreader.NewReader(event.Data)

var (
key string
kind fdv2proto.ObjectKind
item ldstoretypes.ItemDescriptor
err error
version int
)

for obj := r.Object().WithRequiredProperties([]string{
versionField, kindField, keyField, objectField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case kindField:
kind = fdv2proto.ObjectKind(r.String())
case keyField:
key = r.String()
case objectField:
item, err = parseItem(r, kind)
if err != nil {
return nil, err
}
}
}
updates = append(updates, fdv2proto.PutObject{Kind: kind, Key: key, Object: item, Version: version})
var put fdv2proto.PutObject
if err := json.Unmarshal(event.Data, &put); err != nil {
return nil, err
}
changeSet.AddPut(put.Kind, put.Key, put.Version, put.Object)
case fdv2proto.EventDeleteObject:
{
r := jreader.NewReader(event.Data)

var (
version int
kind fdv2proto.ObjectKind
key string
)

for obj := r.Object().WithRequiredProperties([]string{versionField, kindField, keyField}); obj.Next(); {
switch string(obj.Name()) {
case versionField:
version = r.Int()
case kindField:
kind = fdv2proto.ObjectKind(r.String())
//nolint:godox
// TODO: An unrecognized kind should be ignored for forwards compat; the question is,
// do we throw out the DeleteObject here, or let the SDK's store handle it?
case keyField:
key = r.String()
}
}
updates = append(updates, fdv2proto.DeleteObject{Kind: kind, Key: key, Version: version})
var deleteObject fdv2proto.DeleteObject
if err := json.Unmarshal(event.Data, &deleteObject); err != nil {
return nil, err
}
changeSet.AddDelete(deleteObject.Kind, deleteObject.Key, deleteObject.Version)
case fdv2proto.EventPayloadTransferred:
//nolint:godox
// TODO: deserialize the state and create a fdv2proto.Selector.
var selector fdv2proto.Selector
if err := json.Unmarshal(event.Data, &selector); err != nil {
return nil, err
}
return changeSet.Finish(selector)
}
}

if intentCode == "" {
return nil, errors.New("no server-intent event found in polling response")
}

return NewPollingResponse(intentCode, updates, fdv2proto.NoSelector()), nil
return nil, fmt.Errorf("didn't receive any known protocol events in polling payload")
}

func (r *pollingRequester) makeRequest(resource string) ([]byte, bool, error) {
Expand Down
48 changes: 0 additions & 48 deletions internal/datasourcev2/polling_response.go

This file was deleted.

Loading

0 comments on commit 67b411e

Please sign in to comment.