Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mredolatti committed Oct 31, 2023
1 parent 353237e commit 4867a2d
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 38 deletions.
2 changes: 1 addition & 1 deletion splitio/commitversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ This file is created automatically, please do not edit
*/

// CommitVersion is the version of the last commit previous to release
const CommitVersion = "fa204db"
const CommitVersion = "353237e"
15 changes: 12 additions & 3 deletions splitio/proxy/controllers/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"errors"
"fmt"
"net/http"
"slices"
"strconv"
"strings"

"github.com/gin-gonic/gin"
"github.com/splitio/go-split-commons/v5/dtos"
Expand Down Expand Up @@ -52,9 +54,16 @@ func (c *SdkServerController) SplitChanges(ctx *gin.Context) {
if err != nil {
since = -1
}

sets := strings.Split(ctx.Query("sets"), ",")
if !slices.IsSorted(sets) {
c.logger.Warning(fmt.Sprintf("SDK [%s] is sending flagsets unordered.", ctx.Request.Header.Get("SplitSDKVersion"))) // TODO(mredolatti): get this header properly
slices.Sort(sets)
}

c.logger.Debug(fmt.Sprintf("SDK Fetches Feature Flags Since: %d", since))

splits, err := c.fetchSplitChangesSince(since)
splits, err := c.fetchSplitChangesSince(since, sets)
if err != nil {
c.logger.Error("error fetching splitChanges payload from storage: ", err)
ctx.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
Expand Down Expand Up @@ -112,8 +121,8 @@ func (c *SdkServerController) MySegments(ctx *gin.Context) {
ctx.Set(caching.SurrogateContextKey, caching.MakeSurrogateForMySegments(mySegments))
}

func (c *SdkServerController) fetchSplitChangesSince(since int64) (*dtos.SplitChangesDTO, error) {
splits, err := c.proxySplitStorage.ChangesSince(since)
func (c *SdkServerController) fetchSplitChangesSince(since int64, sets []string) (*dtos.SplitChangesDTO, error) {
splits, err := c.proxySplitStorage.ChangesSince(since, sets)
if err == nil {
return splits, nil
}
Expand Down
6 changes: 3 additions & 3 deletions splitio/proxy/controllers/sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestSplitChangesCachedRecipe(t *testing.T) {
},
},
&psmocks.ProxySplitStorageMock{
ChangesSinceCall: func(since int64) (*dtos.SplitChangesDTO, error) {
ChangesSinceCall: func(since int64, sets []string) (*dtos.SplitChangesDTO, error) {
if since != -1 {
t.Error("since should be -1")
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestSplitChangesNonCachedRecipe(t *testing.T) {
},
},
&psmocks.ProxySplitStorageMock{
ChangesSinceCall: func(since int64) (*dtos.SplitChangesDTO, error) {
ChangesSinceCall: func(since int64, sets []string) (*dtos.SplitChangesDTO, error) {
if since != -1 {
t.Error("since should be -1")
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestSplitChangesNonCachedRecipeAndFetchFails(t *testing.T) {
},
},
&psmocks.ProxySplitStorageMock{
ChangesSinceCall: func(since int64) (*dtos.SplitChangesDTO, error) {
ChangesSinceCall: func(since int64, sets []string) (*dtos.SplitChangesDTO, error) {
if since != -1 {
t.Error("since should be -1")
}
Expand Down
4 changes: 3 additions & 1 deletion splitio/proxy/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"

"github.com/splitio/go-split-commons/v5/conf"
"github.com/splitio/go-split-commons/v5/flagsets"
"github.com/splitio/go-split-commons/v5/service/api"
"github.com/splitio/go-split-commons/v5/synchronizer"
"github.com/splitio/go-split-commons/v5/tasks"
Expand Down Expand Up @@ -76,7 +77,8 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error {
splitAPI := api.NewSplitAPI(cfg.Apikey, *advanced, logger, metadata)

// Proxy storages already implement the observable interface, so no need to wrap them
splitStorage := storage.NewProxySplitStorage(dbInstance, logger, cfg.Initialization.Snapshot != "")
// TODO(mredolatti): add a config for flagsets and build it properly here
splitStorage := storage.NewProxySplitStorage(dbInstance, logger, flagsets.NewFlagSetFilter(nil), cfg.Initialization.Snapshot != "")
segmentStorage := storage.NewProxySegmentStorage(dbInstance, logger, cfg.Initialization.Snapshot != "")

// Local telemetry
Expand Down
2 changes: 1 addition & 1 deletion splitio/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestSplitChangesEndpoints(t *testing.T) {
opts := makeOpts()
var changesSinceCalls int64 = 0
opts.ProxySplitStorage = &pstorageMocks.ProxySplitStorageMock{
ChangesSinceCall: func(since int64) (*dtos.SplitChangesDTO, error) {
ChangesSinceCall: func(since int64, sets []string) (*dtos.SplitChangesDTO, error) {
atomic.AddInt64(&changesSinceCalls, 1)
return &dtos.SplitChangesDTO{
Since: since,
Expand Down
6 changes: 3 additions & 3 deletions splitio/proxy/storage/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ import (
)

type ProxySplitStorageMock struct {
ChangesSinceCall func(since int64) (*dtos.SplitChangesDTO, error)
ChangesSinceCall func(since int64, sets []string) (*dtos.SplitChangesDTO, error)
RegisterOlderCnCall func(payload *dtos.SplitChangesDTO)
}

func (p *ProxySplitStorageMock) ChangesSince(since int64) (*dtos.SplitChangesDTO, error) {
return p.ChangesSinceCall(since)
func (p *ProxySplitStorageMock) ChangesSince(since int64, sets []string) (*dtos.SplitChangesDTO, error) {
return p.ChangesSinceCall(since, sets)
}

func (p *ProxySplitStorageMock) RegisterOlderCn(payload *dtos.SplitChangesDTO) {
Expand Down
8 changes: 8 additions & 0 deletions splitio/proxy/storage/optimized/historic.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ func (f *FeatureView) clone() FeatureView {

}

func (f *FeatureView) FlagSetNames() []string {
toRet := make([]string, len(f.FlagSets))
for idx := range f.FlagSets {
toRet[idx] = f.FlagSets[idx].Name
}
return toRet
}

func copyAndFilter(views []FeatureView, sets []string, since int64) []FeatureView {
// precondition: f.Flagsets is sorted by name
// precondition: sets is sorted
Expand Down
83 changes: 62 additions & 21 deletions splitio/proxy/storage/splits.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var ErrSummaryNotCached = errors.New("summary for requested change number not ca
// ProxySplitStorage defines the interface of a storage that can be used for serving splitChanges payloads
// for different requested `since` parameters
type ProxySplitStorage interface {
ChangesSince(since int64) (*dtos.SplitChangesDTO, error)
ChangesSince(since int64, flagSets []string) (*dtos.SplitChangesDTO, error)
RegisterOlderCn(payload *dtos.SplitChangesDTO)
}

Expand All @@ -36,15 +36,17 @@ type ProxySplitStorageImpl struct {
snapshot mutexmap.MMSplitStorage
recipes *optimized.SplitChangesSummaries
db *persistent.SplitChangesCollection
flagSets flagsets.FlagSetFilter
historic optimized.HistoricChanges
mtx sync.Mutex
}

// NewProxySplitStorage instantiates a new proxy storage that wraps an in-memory snapshot of the last known,
// flag configuration, a changes summaries containing recipes to update SDKs with different CNs, and a persistent storage
// for snapshot purposes
func NewProxySplitStorage(db persistent.DBWrapper, logger logging.LoggerInterface, restoreBackup bool) *ProxySplitStorageImpl {
func NewProxySplitStorage(db persistent.DBWrapper, logger logging.LoggerInterface, flagSets flagsets.FlagSetFilter, restoreBackup bool) *ProxySplitStorageImpl {
disk := persistent.NewSplitChangesCollection(db, logger)
snapshot := mutexmap.NewMMSplitStorage(flagsets.NewFlagSetFilter(nil)) // TODO(mredolatti): fix this
snapshot := mutexmap.NewMMSplitStorage(flagSets) // TODO(mredolatti): fix this
recipes := optimized.NewSplitChangesSummaries(maxRecipes)
if restoreBackup {
snapshotFromDisk(snapshot, recipes, disk, logger)
Expand All @@ -53,13 +55,15 @@ func NewProxySplitStorage(db persistent.DBWrapper, logger logging.LoggerInterfac
snapshot: *snapshot,
recipes: recipes,
db: disk,
flagSets: flagSets,
}
}

// ChangesSince builds a SplitChanges payload to from `since` to the latest known CN
func (p *ProxySplitStorageImpl) ChangesSince(since int64) (*dtos.SplitChangesDTO, error) {
// Special case of -1, return all
if since == -1 {
func (p *ProxySplitStorageImpl) ChangesSince(since int64, flagSets []string) (*dtos.SplitChangesDTO, error) {

// No flagsets and fetching from -1, return the current snapshot
if since == -1 && len(flagSets) == 0 {
cn, err := p.snapshot.ChangeNumber()
if err != nil {
return nil, fmt.Errorf("error fetching changeNumber from snapshot: %w", err)
Expand All @@ -68,26 +72,26 @@ func (p *ProxySplitStorageImpl) ChangesSince(since int64) (*dtos.SplitChangesDTO
return &dtos.SplitChangesDTO{Since: since, Till: cn, Splits: all}, nil
}

summary, till, err := p.recipes.FetchSince(int64(since))
if err != nil {
if errors.Is(err, optimized.ErrUnknownChangeNumber) {
return nil, ErrSummaryNotCached
views := p.historic.GetUpdatedSince(since, flagSets)
namesToFetch := make([]string, 0, len(views))
all := make([]dtos.SplitDTO, 0, len(views))
//splitsToArchive := make([]optimized.FeatureView, 0, len(views))
var till int64
for idx := range views {
if t := views[idx].LastUpdated; t > till {
till = t
}
if views[idx].Active {
namesToFetch = append(namesToFetch, views[idx].Name)
} else {
all = append(all, archivedDTOForView(&views[idx]))
}
return nil, fmt.Errorf("unexpected error when fetching changes summary: %w", err)
}

// Regular flow
splitNames := make([]string, 0, len(summary.Updated))
for name := range summary.Updated {
splitNames = append(splitNames, name)
}

active := p.snapshot.FetchMany(splitNames)
all := make([]dtos.SplitDTO, 0, len(summary.Removed)+len(summary.Updated))
for _, split := range active {
for _, split := range p.snapshot.FetchMany(namesToFetch) {
all = append(all, *split)
}
all = append(all, optimized.BuildArchivedSplitsFor(summary.Removed)...)

return &dtos.SplitChangesDTO{Since: since, Till: till, Splits: all}, nil
}

Expand All @@ -105,6 +109,7 @@ func (p *ProxySplitStorageImpl) Update(toAdd []dtos.SplitDTO, toRemove []dtos.Sp

p.mtx.Lock()
p.snapshot.Update(toAdd, toRemove, changeNumber)
p.historic.Update(toAdd, toRemove, changeNumber)
p.recipes.AddChanges(toAdd, toRemove, changeNumber)
p.db.Update(toAdd, toRemove, changeNumber)
p.mtx.Unlock()
Expand Down Expand Up @@ -191,6 +196,42 @@ func snapshotFromDisk(dst *mutexmap.MMSplitStorage, summary *optimized.SplitChan
summary.AddChanges(filtered, nil, cn)
}

func archivedDTOForView(view *optimized.FeatureView) dtos.SplitDTO {
return dtos.SplitDTO{
ChangeNumber: 1,
TrafficTypeName: view.TrafficTypeName,
Name: view.Name,
TrafficAllocation: 100,
TrafficAllocationSeed: 0,
Seed: 0,
Status: "ARCHIVED",
Killed: false,
DefaultTreatment: "off",
Algo: 1,
Conditions: make([]dtos.ConditionDTO, 0),
Sets: view.FlagSetNames(),
}
}

func appendArchivedSplitsForViews(views []optimized.FeatureView, dst *[]dtos.SplitDTO) {
for idx := range views {
*dst = append(*dst, dtos.SplitDTO{
ChangeNumber: 1,
TrafficTypeName: views[idx].TrafficTypeName,
Name: views[idx].Name,
TrafficAllocation: 100,
TrafficAllocationSeed: 0,
Seed: 0,
Status: "ARCHIVED",
Killed: false,
DefaultTreatment: "off",
Algo: 1,
Conditions: make([]dtos.ConditionDTO, 0),
Sets: views[idx].FlagSetNames(),
})
}
}

var _ ProxySplitStorage = (*ProxySplitStorageImpl)(nil)
var _ storage.SplitStorage = (*ProxySplitStorageImpl)(nil)
var _ observability.ObservableSplitStorage = (*ProxySplitStorageImpl)(nil)
91 changes: 86 additions & 5 deletions splitio/proxy/storage/splits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import (
"github.com/splitio/split-synchronizer/v5/splitio/proxy/storage/persistent"

"github.com/splitio/go-split-commons/v5/dtos"
"github.com/splitio/go-split-commons/v5/flagsets"
"github.com/splitio/go-toolkit/v5/logging"

"github.com/stretchr/testify/assert"
)

func TestSplitStorage(t *testing.T) {
Expand All @@ -19,11 +22,11 @@ func TestSplitStorage(t *testing.T) {
splitC := persistent.NewSplitChangesCollection(dbw, logger)

splitC.Update([]dtos.SplitDTO{
{Name: "s1", ChangeNumber: 1, Status: "ACTIVE"},
{Name: "s2", ChangeNumber: 2, Status: "ACTIVE"},
{Name: "f1", ChangeNumber: 1, Status: "ACTIVE"},
{Name: "f2", ChangeNumber: 2, Status: "ACTIVE"},
}, nil, 1)

pss := NewProxySplitStorage(dbw, logger, true)
pss := NewProxySplitStorage(dbw, logger, flagsets.NewFlagSetFilter(nil), true)

sinceMinus1, currentCN, err := pss.recipes.FetchSince(-1)
if err != nil {
Expand All @@ -34,11 +37,11 @@ func TestSplitStorage(t *testing.T) {
t.Error("current cn should be 2. Is: ", currentCN)
}

if _, ok := sinceMinus1.Updated["s1"]; !ok {
if _, ok := sinceMinus1.Updated["f1"]; !ok {
t.Error("s1 should be added")
}

if _, ok := sinceMinus1.Updated["s2"]; !ok {
if _, ok := sinceMinus1.Updated["f2"]; !ok {
t.Error("s2 should be added")
}

Expand All @@ -58,5 +61,83 @@ func TestSplitStorage(t *testing.T) {
if len(since2.Removed) != 0 {
t.Error("nothing should have been removed")
}
}

func TestSplitStorageWithFlagsets(t *testing.T) {
dbw, err := persistent.NewBoltWrapper(persistent.BoltInMemoryMode, nil)
if err != nil {
t.Error("error creating bolt wrapper: ", err)
}

logger := logging.NewLogger(nil)

pss := NewProxySplitStorage(dbw, logger, flagsets.NewFlagSetFilter(nil), true)

pss.Update([]dtos.SplitDTO{
{Name: "f1", ChangeNumber: 1, Status: "ACTIVE", Sets: []string{"s1", "s2"}},
{Name: "f2", ChangeNumber: 2, Status: "ACTIVE", Sets: []string{"s2", "s3"}},
}, nil, 2)

res, err := pss.ChangesSince(-1, nil)
assert.Nil(t, err)
assert.Equal(t, int64(-1), res.Since)
assert.Equal(t, int64(2), res.Till)
assert.ElementsMatch(t, []dtos.SplitDTO{
{Name: "f1", ChangeNumber: 1, Status: "ACTIVE", Sets: []string{"s1", "s2"}},
{Name: "f2", ChangeNumber: 2, Status: "ACTIVE", Sets: []string{"s2", "s3"}},
}, res.Splits)

// check for s1
res, err = pss.ChangesSince(-1, []string{"s1"})
assert.Nil(t, err)
assert.Equal(t, int64(-1), res.Since)
assert.Equal(t, int64(1), res.Till)
assert.ElementsMatch(t, []dtos.SplitDTO{
{Name: "f1", ChangeNumber: 1, Status: "ACTIVE", Sets: []string{"s1", "s2"}},
}, res.Splits)

// check for s2
res, err = pss.ChangesSince(-1, []string{"s2"})
assert.Nil(t, err)
assert.Equal(t, int64(-1), res.Since)
assert.Equal(t, int64(2), res.Till)
assert.ElementsMatch(t, []dtos.SplitDTO{
{Name: "f1", ChangeNumber: 1, Status: "ACTIVE", Sets: []string{"s1", "s2"}},
{Name: "f2", ChangeNumber: 2, Status: "ACTIVE", Sets: []string{"s2", "s3"}},
}, res.Splits)

// check for s3
res, err = pss.ChangesSince(-1, []string{"s3"})
assert.Nil(t, err)
assert.Equal(t, int64(-1), res.Since)
assert.Equal(t, int64(2), res.Till)
assert.ElementsMatch(t, []dtos.SplitDTO{
{Name: "f2", ChangeNumber: 2, Status: "ACTIVE", Sets: []string{"s2", "s3"}},
}, res.Splits)

// ---------------------------

// remove f1 from s2
pss.Update([]dtos.SplitDTO{
{Name: "f1", ChangeNumber: 3, Status: "ACTIVE", Sets: []string{"s1"}},
}, nil, 2)

// fetching from -1 only returns f1
res, err = pss.ChangesSince(-1, []string{"s2"})
assert.Nil(t, err)
assert.Equal(t, int64(-1), res.Since)
assert.Equal(t, int64(2), res.Till)
assert.ElementsMatch(t, []dtos.SplitDTO{
{Name: "f2", ChangeNumber: 2, Status: "ACTIVE", Sets: []string{"s2", "s3"}},
}, res.Splits)

// fetching from -1 only returns f1
res, err = pss.ChangesSince(-1, []string{"s2"})
assert.Nil(t, err)
assert.Equal(t, int64(-1), res.Since)
assert.Equal(t, int64(2), res.Till)
assert.ElementsMatch(t, []dtos.SplitDTO{
{Name: "f2", ChangeNumber: 2, Status: "ACTIVE", Sets: []string{"s2", "s3"}},
}, res.Splits)

}

0 comments on commit 4867a2d

Please sign in to comment.