Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDKS-9043] Cache aware large segment Implementation #289

Merged
merged 8 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.3.0
github.com/splitio/gincache v1.0.1
github.com/splitio/go-split-commons/v6 v6.0.1
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241126190617-03ffbb3a100f
github.com/splitio/go-toolkit/v5 v5.4.0
github.com/stretchr/testify v1.9.0
go.etcd.io/bbolt v1.3.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU=
github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY=
github.com/splitio/go-split-commons/v6 v6.0.1 h1:WJcvTk8lwWw6kLQvxt8hOkY/tGlBN4w+2agkINPGugY=
github.com/splitio/go-split-commons/v6 v6.0.1/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241126190617-03ffbb3a100f h1:FbR7KHn2kdtBda7CZVxut2qfFH7ZI4XW+FV6B0JpWgM=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241126190617-03ffbb3a100f/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY=
github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM=
github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
4 changes: 3 additions & 1 deletion splitio/proxy/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ const (
// SplitSurrogate key (we only need one, since all splitChanges should be expired when an update is processed)
SplitSurrogate = "sp"

// LargeSegmentSurrogate key (we only need one, since all memberships should be expired when an update is processed)
LargeSegmentSurrogate = "ls"

// AuthSurrogate key (having push disabled, it's safe to cache this and return it on all requests)
AuthSurrogate = "au"

Expand Down Expand Up @@ -60,7 +63,6 @@ func MakeProxyCache() *gincache.Middleware {
}

func keyFactoryFN(ctx *gin.Context) string {

var encodingPrefix string
if strings.Contains(ctx.Request.Header.Get("Accept-Encoding"), "gzip") {
encodingPrefix = "gzip::"
Expand Down
189 changes: 189 additions & 0 deletions splitio/proxy/caching/mocks/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package mocks

import (
"github.com/splitio/gincache"
"github.com/splitio/go-split-commons/v6/dtos"
"github.com/splitio/go-split-commons/v6/storage"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/segment"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/split"
"github.com/splitio/go-toolkit/v5/datastructures/set"
"github.com/stretchr/testify/mock"
)

// Borrowed mocks: These sohuld be in go-split-commons. but we need to wait until testify is adopted there

type SplitUpdaterMock struct {
mock.Mock
}

// LocalKill implements split.Updater
func (s *SplitUpdaterMock) LocalKill(splitName string, defaultTreatment string, changeNumber int64) {
s.Called(splitName, defaultTreatment, changeNumber)
}

// SynchronizeFeatureFlags implements split.Updater
func (s *SplitUpdaterMock) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) (*split.UpdateResult, error) {
args := s.Called(ffChange)
return args.Get(0).(*split.UpdateResult), args.Error(1)
}

// SynchronizeSplits implements split.Updater
func (s *SplitUpdaterMock) SynchronizeSplits(till *int64) (*split.UpdateResult, error) {
args := s.Called(till)
return args.Get(0).(*split.UpdateResult), args.Error(1)
}

// ----

type CacheFlusherMock struct {
mock.Mock
}

func (c *CacheFlusherMock) Evict(key string) { c.Called(key) }
func (c *CacheFlusherMock) EvictAll() { c.Called() }
func (c *CacheFlusherMock) EvictBySurrogate(surrogate string) { c.Called(surrogate) }

// ---

type SplitStorageMock struct {
mock.Mock
}

func (s *SplitStorageMock) All() []dtos.SplitDTO { panic("unimplemented") }
func (s *SplitStorageMock) ChangeNumber() (int64, error) {
args := s.Called()
return args.Get(0).(int64), args.Error(1)
}

func (*SplitStorageMock) FetchMany(splitNames []string) map[string]*dtos.SplitDTO {
panic("unimplemented")
}
func (*SplitStorageMock) GetNamesByFlagSets(sets []string) map[string][]string {
panic("unimplemented")
}
func (*SplitStorageMock) GetAllFlagSetNames() []string {
panic("unimplemented")
}
func (*SplitStorageMock) KillLocally(splitName string, defaultTreatment string, changeNumber int64) {
panic("unimplemented")
}
func (s *SplitStorageMock) SegmentNames() *set.ThreadUnsafeSet {
return s.Called().Get(0).(*set.ThreadUnsafeSet)
}
func (s *SplitStorageMock) LargeSegmentNames() *set.ThreadUnsafeSet {
return s.Called().Get(0).(*set.ThreadUnsafeSet)
}
func (s *SplitStorageMock) SetChangeNumber(changeNumber int64) error {
return s.Called(changeNumber).Error(0)
}
func (*SplitStorageMock) Split(splitName string) *dtos.SplitDTO { panic("unimplemented") }
func (*SplitStorageMock) SplitNames() []string { panic("unimplemented") }
func (*SplitStorageMock) TrafficTypeExists(trafficType string) bool { panic("unimplemented") }
func (*SplitStorageMock) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, changeNumber int64) {
panic("unimplemented")
}

type SegmentUpdaterMock struct {
mock.Mock
}

func (s *SegmentUpdaterMock) IsSegmentCached(segmentName string) bool { panic("unimplemented") }
func (s *SegmentUpdaterMock) SegmentNames() []interface{} { panic("unimplemented") }

func (s *SegmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) {
args := s.Called(name, till)
return args.Get(0).(*segment.UpdateResult), args.Error(1)
}

func (s *SegmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) {
args := s.Called()
return args.Get(0).(map[string]segment.UpdateResult), args.Error(1)
}

type SegmentStorageMock struct {
mock.Mock
}

func (*SegmentStorageMock) SetChangeNumber(segmentName string, till int64) error {
panic("unimplemented")
}
func (s *SegmentStorageMock) Update(name string, toAdd *set.ThreadUnsafeSet, toRemove *set.ThreadUnsafeSet, changeNumber int64) error {
return s.Called(name, toAdd, toRemove, changeNumber).Error(0)
}

// ChangeNumber implements storage.SegmentStorage
func (s *SegmentStorageMock) ChangeNumber(segmentName string) (int64, error) {
args := s.Called(segmentName)
return args.Get(0).(int64), args.Error(1)
}

func (*SegmentStorageMock) Keys(segmentName string) *set.ThreadUnsafeSet { panic("unimplemented") }
func (*SegmentStorageMock) SegmentContainsKey(segmentName string, key string) (bool, error) {
panic("unimplemented")
}
func (*SegmentStorageMock) SegmentKeysCount() int64 { panic("unimplemented") }

// ---

type LargeSegmentStorageMock struct {
mock.Mock
}

func (s *LargeSegmentStorageMock) SetChangeNumber(name string, till int64) {
s.Called(name, till).Error(0)
}

func (s *LargeSegmentStorageMock) Update(name string, userKeys []string, till int64) {
s.Called(name, userKeys, till)
}
func (s *LargeSegmentStorageMock) ChangeNumber(name string) int64 {
args := s.Called(name)
return args.Get(0).(int64)
}
func (s *LargeSegmentStorageMock) Count() int {
args := s.Called()
return args.Get(0).(int)
}
func (s *LargeSegmentStorageMock) LargeSegmentsForUser(userKey string) []string {
return []string{}
}
func (s *LargeSegmentStorageMock) IsInLargeSegment(name string, key string) (bool, error) {
args := s.Called(name, key)
return args.Get(0).(bool), args.Error(1)
}
func (s *LargeSegmentStorageMock) TotalKeys(name string) int {
return s.Called(name).Get(0).(int)
}

// ---

type LargeSegmentUpdaterMock struct {
mock.Mock
}

func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegment(name string, till *int64) (*int64, error) {
args := u.Called(name, till)
return args.Get(0).(*int64), args.Error(1)
}
func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegments() (map[string]*int64, error) {
args := u.Called()
return args.Get(0).(map[string]*int64), args.Error(1)
}
func (u *LargeSegmentUpdaterMock) IsCached(name string) bool {
return u.Called().Get(0).(bool)
}
func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegmentUpdate(lsRFDResponseDTO *dtos.LargeSegmentRFDResponseDTO) (*int64, error) {
args := u.Called(lsRFDResponseDTO)
return args.Get(0).(*int64), args.Error(1)
}

// ---

var _ gincache.CacheFlusher = (*CacheFlusherMock)(nil)
var _ split.Updater = (*SplitUpdaterMock)(nil)
var _ segment.Updater = (*SegmentUpdaterMock)(nil)
var _ largesegment.Updater = (*LargeSegmentUpdaterMock)(nil)
var _ storage.SplitStorage = (*SplitStorageMock)(nil)
var _ storage.SegmentStorage = (*SegmentStorageMock)(nil)
var _ storage.LargeSegmentsStorage = (*LargeSegmentStorageMock)(nil)
74 changes: 74 additions & 0 deletions splitio/proxy/caching/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/splitio/go-split-commons/v6/healthcheck/application"
"github.com/splitio/go-split-commons/v6/service"
"github.com/splitio/go-split-commons/v6/storage"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/segment"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/split"
"github.com/splitio/go-toolkit/v5/logging"
Expand Down Expand Up @@ -98,6 +99,7 @@ func (c *CacheAwareSegmentSynchronizer) SynchronizeSegment(name string, till *in
result, err := c.wrapped.SynchronizeSegment(name, till)
if current := result.NewChangeNumber; current > previous || (previous != -1 && current == -1) {
c.cacheFlusher.EvictBySurrogate(MakeSurrogateForSegmentChanges(name))
c.cacheFlusher.EvictBySurrogate(LargeSegmentSurrogate)
}

// remove individual entries for each affected key
Expand Down Expand Up @@ -129,6 +131,7 @@ func (c *CacheAwareSegmentSynchronizer) SynchronizeSegments() (map[string]segmen
if pcn, _ := previousCNs[segmentName]; ccn > pcn || (pcn > 0 && ccn == -1) {
// if the segment was updated or the segment was removed, evict it
c.cacheFlusher.EvictBySurrogate(MakeSurrogateForSegmentChanges(segmentName))
c.cacheFlusher.EvictBySurrogate(LargeSegmentSurrogate)
}

for idx := range result.UpdatedKeys {
Expand All @@ -151,3 +154,74 @@ func (c *CacheAwareSegmentSynchronizer) SegmentNames() []interface{} {
func (c *CacheAwareSegmentSynchronizer) IsSegmentCached(segmentName string) bool {
return c.wrapped.IsSegmentCached(segmentName)
}

// CacheAwareLargeSegmentSynchronizer
type CacheAwareLargeSegmentSynchronizer struct {
wrapped largesegment.Updater
largeSegmentStorage storage.LargeSegmentsStorage
cacheFlusher gincache.CacheFlusher
splitStorage storage.SplitStorage
}

func NewCacheAwareLargeSegmentSync(
splitStorage storage.SplitStorage,
largeSegmentStorage storage.LargeSegmentsStorage,
largeSegmentFetcher service.LargeSegmentFetcher,
logger logging.LoggerInterface,
runtimeTelemetry storage.TelemetryRuntimeProducer,
cacheFlusher gincache.CacheFlusher,
appMonitor application.MonitorProducerInterface,
) *CacheAwareLargeSegmentSynchronizer {
return &CacheAwareLargeSegmentSynchronizer{
wrapped: largesegment.NewLargeSegmentUpdater(splitStorage, largeSegmentStorage, largeSegmentFetcher, logger, runtimeTelemetry, appMonitor),
cacheFlusher: cacheFlusher,
largeSegmentStorage: largeSegmentStorage,
splitStorage: splitStorage,
}
}

func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegment(name string, till *int64) (*int64, error) {
previous := c.largeSegmentStorage.ChangeNumber(name)
newCN, err := c.wrapped.SynchronizeLargeSegment(name, till)

c.evictByLargeSegmentSurrogate(previous, *newCN)

return newCN, err
}

func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[string]*int64, error) {
previousLargeSegmentNames := c.splitStorage.LargeSegmentNames()
previousCNs := make(map[string]int64, previousLargeSegmentNames.Size())
for _, name := range previousLargeSegmentNames.List() {
if strName, ok := name.(string); ok {
cn := c.largeSegmentStorage.ChangeNumber(strName)
previousCNs[strName] = cn
}
}

results, err := c.wrapped.SynchronizeLargeSegments()
for name, currentCN := range results {
c.evictByLargeSegmentSurrogate(previousCNs[name], *currentCN)
}

return results, err
}

func (c *CacheAwareLargeSegmentSynchronizer) IsCached(name string) bool {
return c.wrapped.IsCached(name)
}

func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegmentUpdate(lsRFDResponseDTO *dtos.LargeSegmentRFDResponseDTO) (*int64, error) {
previous := c.largeSegmentStorage.ChangeNumber(lsRFDResponseDTO.Name)
newCN, err := c.wrapped.SynchronizeLargeSegmentUpdate(lsRFDResponseDTO)

c.evictByLargeSegmentSurrogate(previous, *newCN)

return newCN, err
}

func (c *CacheAwareLargeSegmentSynchronizer) evictByLargeSegmentSurrogate(previousCN int64, currentCN int64) {
if currentCN > previousCN || currentCN == -1 {
c.cacheFlusher.EvictBySurrogate(LargeSegmentSurrogate)
}
}
Loading