Skip to content

Commit

Permalink
LargeSegment caching implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
sanzmauro committed Nov 19, 2024
1 parent 319e79e commit 4836d47
Show file tree
Hide file tree
Showing 7 changed files with 368 additions and 139 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/gin-gonic/gin v1.10.0
github.com/google/uuid v1.3.0
github.com/splitio/gincache v1.0.1
github.com/splitio/go-split-commons/v6 v6.0.1
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941
github.com/splitio/go-toolkit/v5 v5.4.0
github.com/stretchr/testify v1.9.0
go.etcd.io/bbolt v1.3.6
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IX
github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY=
github.com/splitio/go-split-commons/v6 v6.0.1 h1:WJcvTk8lwWw6kLQvxt8hOkY/tGlBN4w+2agkINPGugY=
github.com/splitio/go-split-commons/v6 v6.0.1/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241115210219-e8964f13d489 h1:9sr63h4Kco1TSPtwaiECfRYfNvDMYtvQL2q4r62drDo=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241115210219-e8964f13d489/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941 h1:Rzpm7u9uIaTsQDvSWRPsMbmBpZeg2kUQkVt0+30ubj4=
github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc=
github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM=
github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
16 changes: 15 additions & 1 deletion splitio/proxy/caching/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,17 @@ const (
AuthSurrogate = "au"

segmentPrefix = "se::"

largeSegmentPrefix = "ls::"
)

const cacheSize = 1000000

// MakeSurrogateForSegmentChanges creates a surrogate key for the segment being queried
func MakeSurrogateForLargeSegmentChanges(name string) string {
return largeSegmentPrefix + name
}

// MakeSurrogateForSegmentChanges creates a surrogate key for the segment being queried
func MakeSurrogateForSegmentChanges(segmentName string) string {
return segmentPrefix + segmentName
Expand All @@ -38,6 +45,14 @@ func MakeSurrogateForMySegments(mysegments []dtos.MySegmentDTO) []string {
return nil
}

// MakeMembershipsEntries create a cache entry key for Memberships
func MakeMembershipsEntries(key string) []string {
return []string{
"/api/memberships/" + key,
"gzip::/api/memberships/" + key,
}
}

// MakeMySegmentsEntry create a cache entry key for mysegments
func MakeMySegmentsEntries(key string) []string {
return []string{
Expand All @@ -60,7 +75,6 @@ func MakeProxyCache() *gincache.Middleware {
}

func keyFactoryFN(ctx *gin.Context) string {

var encodingPrefix string
if strings.Contains(ctx.Request.Header.Get("Accept-Encoding"), "gzip") {
encodingPrefix = "gzip::"
Expand Down
182 changes: 182 additions & 0 deletions splitio/proxy/caching/mocks/mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package mocks

import (
"github.com/splitio/gincache"
"github.com/splitio/go-split-commons/v6/dtos"
"github.com/splitio/go-split-commons/v6/storage"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/segment"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/split"
"github.com/splitio/go-toolkit/v5/datastructures/set"
"github.com/stretchr/testify/mock"
)

// Borrowed mocks: These sohuld be in go-split-commons. but we need to wait until testify is adopted there

type SplitUpdaterMock struct {
mock.Mock
}

// LocalKill implements split.Updater
func (s *SplitUpdaterMock) LocalKill(splitName string, defaultTreatment string, changeNumber int64) {
s.Called(splitName, defaultTreatment, changeNumber)
}

// SynchronizeFeatureFlags implements split.Updater
func (s *SplitUpdaterMock) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) (*split.UpdateResult, error) {
args := s.Called(ffChange)
return args.Get(0).(*split.UpdateResult), args.Error(1)
}

// SynchronizeSplits implements split.Updater
func (s *SplitUpdaterMock) SynchronizeSplits(till *int64) (*split.UpdateResult, error) {
args := s.Called(till)
return args.Get(0).(*split.UpdateResult), args.Error(1)
}

// ----

type CacheFlusherMock struct {
mock.Mock
}

func (c *CacheFlusherMock) Evict(key string) { c.Called(key) }
func (c *CacheFlusherMock) EvictAll() { c.Called() }
func (c *CacheFlusherMock) EvictBySurrogate(surrogate string) { c.Called(surrogate) }

// ---

type SplitStorageMock struct {
mock.Mock
}

func (s *SplitStorageMock) All() []dtos.SplitDTO { panic("unimplemented") }
func (s *SplitStorageMock) ChangeNumber() (int64, error) {
args := s.Called()
return args.Get(0).(int64), args.Error(1)
}

func (*SplitStorageMock) FetchMany(splitNames []string) map[string]*dtos.SplitDTO {
panic("unimplemented")
}
func (*SplitStorageMock) GetNamesByFlagSets(sets []string) map[string][]string {
panic("unimplemented")
}
func (*SplitStorageMock) GetAllFlagSetNames() []string {
panic("unimplemented")
}
func (*SplitStorageMock) KillLocally(splitName string, defaultTreatment string, changeNumber int64) {
panic("unimplemented")
}
func (s *SplitStorageMock) SegmentNames() *set.ThreadUnsafeSet {
return s.Called().Get(0).(*set.ThreadUnsafeSet)
}
func (s *SplitStorageMock) LargeSegmentNames() *set.ThreadUnsafeSet {
return s.Called().Get(0).(*set.ThreadUnsafeSet)
}
func (s *SplitStorageMock) SetChangeNumber(changeNumber int64) error {
return s.Called(changeNumber).Error(0)
}
func (*SplitStorageMock) Split(splitName string) *dtos.SplitDTO { panic("unimplemented") }
func (*SplitStorageMock) SplitNames() []string { panic("unimplemented") }
func (*SplitStorageMock) TrafficTypeExists(trafficType string) bool { panic("unimplemented") }
func (*SplitStorageMock) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, changeNumber int64) {
panic("unimplemented")
}

type SegmentUpdaterMock struct {
mock.Mock
}

func (s *SegmentUpdaterMock) IsSegmentCached(segmentName string) bool { panic("unimplemented") }
func (s *SegmentUpdaterMock) SegmentNames() []interface{} { panic("unimplemented") }

func (s *SegmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) {
args := s.Called(name, till)
return args.Get(0).(*segment.UpdateResult), args.Error(1)
}

func (s *SegmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) {
args := s.Called()
return args.Get(0).(map[string]segment.UpdateResult), args.Error(1)
}

type SegmentStorageMock struct {
mock.Mock
}

func (*SegmentStorageMock) SetChangeNumber(segmentName string, till int64) error {
panic("unimplemented")
}
func (s *SegmentStorageMock) Update(name string, toAdd *set.ThreadUnsafeSet, toRemove *set.ThreadUnsafeSet, changeNumber int64) error {
return s.Called(name, toAdd, toRemove, changeNumber).Error(0)
}

// ChangeNumber implements storage.SegmentStorage
func (s *SegmentStorageMock) ChangeNumber(segmentName string) (int64, error) {
args := s.Called(segmentName)
return args.Get(0).(int64), args.Error(1)
}

func (*SegmentStorageMock) Keys(segmentName string) *set.ThreadUnsafeSet { panic("unimplemented") }
func (*SegmentStorageMock) SegmentContainsKey(segmentName string, key string) (bool, error) {
panic("unimplemented")
}
func (*SegmentStorageMock) SegmentKeysCount() int64 { panic("unimplemented") }

// ---

type LargeSegmentStorageMock struct {
mock.Mock
}

func (s *LargeSegmentStorageMock) SetChangeNumber(name string, till int64) {
s.Called(name, till).Error(0)
}

func (s *LargeSegmentStorageMock) Update(name string, userKeys []string, till int64) {
s.Called(name, userKeys, till)
}
func (s *LargeSegmentStorageMock) ChangeNumber(name string) int64 {
args := s.Called(name)
return args.Get(0).(int64)
}
func (s *LargeSegmentStorageMock) Count() int {
args := s.Called()
return args.Get(0).(int)
}
func (s *LargeSegmentStorageMock) LargeSegmentsForUser(userKey string) []string {
return []string{}
}
func (s *LargeSegmentStorageMock) ContainsKey(name string, key string) (bool, error) {
args := s.Called(name, key)
return args.Get(0).(bool), args.Error(1)
}

// ---

type LargeSegmentUpdaterMock struct {
mock.Mock
}

func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegment(name string, till *int64) (*int64, error) {
args := u.Called(name, till)
return args.Get(0).(*int64), args.Error(1)
}
func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegments() (map[string]*int64, error) {
args := u.Called()
return args.Get(0).(map[string]*int64), args.Error(1)
}
func (u *LargeSegmentUpdaterMock) IsCached(name string) bool {
return false
}

// ---

var _ gincache.CacheFlusher = (*CacheFlusherMock)(nil)
var _ split.Updater = (*SplitUpdaterMock)(nil)
var _ segment.Updater = (*SegmentUpdaterMock)(nil)
var _ largesegment.Updater = (*LargeSegmentUpdaterMock)(nil)
var _ storage.SplitStorage = (*SplitStorageMock)(nil)
var _ storage.SegmentStorage = (*SegmentStorageMock)(nil)
var _ storage.LargeSegmentsStorage = (*LargeSegmentStorageMock)(nil)
63 changes: 63 additions & 0 deletions splitio/proxy/caching/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/splitio/go-split-commons/v6/healthcheck/application"
"github.com/splitio/go-split-commons/v6/service"
"github.com/splitio/go-split-commons/v6/storage"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/segment"
"github.com/splitio/go-split-commons/v6/synchronizer/worker/split"
"github.com/splitio/go-toolkit/v5/logging"
Expand Down Expand Up @@ -151,3 +152,65 @@ func (c *CacheAwareSegmentSynchronizer) SegmentNames() []interface{} {
func (c *CacheAwareSegmentSynchronizer) IsSegmentCached(segmentName string) bool {
return c.wrapped.IsSegmentCached(segmentName)
}

// CacheAwareLargeSegmentSynchronizer
type CacheAwareLargeSegmentSynchronizer struct {
wrapped largesegment.Updater
largeSegmentStorage storage.LargeSegmentsStorage
cacheFlusher gincache.CacheFlusher
splitStorage storage.SplitStorage
}

func NewCacheAwareLargeSegmentSync(
splitStorage storage.SplitStorage,
largeSegmentStorage storage.LargeSegmentsStorage,
largeSegmentFetcher service.LargeSegmentFetcher,
logger logging.LoggerInterface,
runtimeTelemetry storage.TelemetryRuntimeProducer,
cacheFlusher gincache.CacheFlusher,
appMonitor application.MonitorProducerInterface,
) *CacheAwareLargeSegmentSynchronizer {
return &CacheAwareLargeSegmentSynchronizer{
wrapped: largesegment.NewLargeSegmentUpdater(splitStorage, largeSegmentStorage, largeSegmentFetcher, logger, runtimeTelemetry, appMonitor),
cacheFlusher: cacheFlusher,
largeSegmentStorage: largeSegmentStorage,
splitStorage: splitStorage,
}
}

func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegment(name string, till *int64) (*int64, error) {
previous := c.largeSegmentStorage.ChangeNumber(name)
newCN, err := c.wrapped.SynchronizeLargeSegment(name, till)

c.shouldEvictBySurrogate(name, previous, *newCN)

return newCN, err
}

func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[string]*int64, error) {
previousLargeSegmentNames := c.splitStorage.LargeSegmentNames()
previousCNs := make(map[string]int64, previousLargeSegmentNames.Size())
for _, name := range previousLargeSegmentNames.List() {
if strName, ok := name.(string); ok {
cn := c.largeSegmentStorage.ChangeNumber(strName)
previousCNs[strName] = cn
}
}

results, err := c.wrapped.SynchronizeLargeSegments()
for name, res := range results {
c.shouldEvictBySurrogate(name, previousCNs[name], *res)
}

return results, err
}

func (c *CacheAwareLargeSegmentSynchronizer) shouldEvictBySurrogate(name string, previousCN int64, currentCN int64) {
if currentCN > previousCN || (previousCN > 0 && currentCN == -1) {
c.cacheFlusher.EvictBySurrogate(MakeSurrogateForLargeSegmentChanges(name))
}
}

func (c *CacheAwareLargeSegmentSynchronizer) IsCached(name string) bool {
return c.wrapped.IsCached(name)
}
Loading

0 comments on commit 4836d47

Please sign in to comment.