From 4836d47b42f34966c94b9abc845a881dcec541a3 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Tue, 19 Nov 2024 19:17:15 -0300 Subject: [PATCH 1/8] LargeSegment caching implementation --- go.mod | 2 +- go.sum | 4 + splitio/proxy/caching/caching.go | 16 +- splitio/proxy/caching/mocks/mock.go | 182 ++++++++++++++++++++ splitio/proxy/caching/workers.go | 63 +++++++ splitio/proxy/caching/workers_test.go | 235 +++++++++++--------------- splitio/proxy/storage/splits.go | 5 + 7 files changed, 368 insertions(+), 139 deletions(-) create mode 100644 splitio/proxy/caching/mocks/mock.go diff --git a/go.mod b/go.mod index 35f9e160..4c93a7e8 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.3.0 github.com/splitio/gincache v1.0.1 - github.com/splitio/go-split-commons/v6 v6.0.1 + github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941 github.com/splitio/go-toolkit/v5 v5.4.0 github.com/stretchr/testify v1.9.0 go.etcd.io/bbolt v1.3.6 diff --git a/go.sum b/go.sum index 9f6e2ecd..78072e6d 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,10 @@ github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IX github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY= github.com/splitio/go-split-commons/v6 v6.0.1 h1:WJcvTk8lwWw6kLQvxt8hOkY/tGlBN4w+2agkINPGugY= github.com/splitio/go-split-commons/v6 v6.0.1/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241115210219-e8964f13d489 h1:9sr63h4Kco1TSPtwaiECfRYfNvDMYtvQL2q4r62drDo= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241115210219-e8964f13d489/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941 h1:Rzpm7u9uIaTsQDvSWRPsMbmBpZeg2kUQkVt0+30ubj4= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM= github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/splitio/proxy/caching/caching.go b/splitio/proxy/caching/caching.go index efae93cf..cc64e021 100644 --- a/splitio/proxy/caching/caching.go +++ b/splitio/proxy/caching/caching.go @@ -23,10 +23,17 @@ const ( AuthSurrogate = "au" segmentPrefix = "se::" + + largeSegmentPrefix = "ls::" ) const cacheSize = 1000000 +// MakeSurrogateForSegmentChanges creates a surrogate key for the segment being queried +func MakeSurrogateForLargeSegmentChanges(name string) string { + return largeSegmentPrefix + name +} + // MakeSurrogateForSegmentChanges creates a surrogate key for the segment being queried func MakeSurrogateForSegmentChanges(segmentName string) string { return segmentPrefix + segmentName @@ -38,6 +45,14 @@ func MakeSurrogateForMySegments(mysegments []dtos.MySegmentDTO) []string { return nil } +// MakeMembershipsEntries create a cache entry key for Memberships +func MakeMembershipsEntries(key string) []string { + return []string{ + "/api/memberships/" + key, + "gzip::/api/memberships/" + key, + } +} + // MakeMySegmentsEntry create a cache entry key for mysegments func MakeMySegmentsEntries(key string) []string { return []string{ @@ -60,7 +75,6 @@ func MakeProxyCache() *gincache.Middleware { } func keyFactoryFN(ctx *gin.Context) string { - var encodingPrefix string if strings.Contains(ctx.Request.Header.Get("Accept-Encoding"), "gzip") { encodingPrefix = "gzip::" diff --git a/splitio/proxy/caching/mocks/mock.go b/splitio/proxy/caching/mocks/mock.go new file mode 100644 index 00000000..06f3682f --- /dev/null +++ b/splitio/proxy/caching/mocks/mock.go @@ -0,0 +1,182 @@ +package mocks + +import ( + "github.com/splitio/gincache" + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-split-commons/v6/storage" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/segment" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/split" + "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/stretchr/testify/mock" +) + +// Borrowed mocks: These sohuld be in go-split-commons. but we need to wait until testify is adopted there + +type SplitUpdaterMock struct { + mock.Mock +} + +// LocalKill implements split.Updater +func (s *SplitUpdaterMock) LocalKill(splitName string, defaultTreatment string, changeNumber int64) { + s.Called(splitName, defaultTreatment, changeNumber) +} + +// SynchronizeFeatureFlags implements split.Updater +func (s *SplitUpdaterMock) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) (*split.UpdateResult, error) { + args := s.Called(ffChange) + return args.Get(0).(*split.UpdateResult), args.Error(1) +} + +// SynchronizeSplits implements split.Updater +func (s *SplitUpdaterMock) SynchronizeSplits(till *int64) (*split.UpdateResult, error) { + args := s.Called(till) + return args.Get(0).(*split.UpdateResult), args.Error(1) +} + +// ---- + +type CacheFlusherMock struct { + mock.Mock +} + +func (c *CacheFlusherMock) Evict(key string) { c.Called(key) } +func (c *CacheFlusherMock) EvictAll() { c.Called() } +func (c *CacheFlusherMock) EvictBySurrogate(surrogate string) { c.Called(surrogate) } + +// --- + +type SplitStorageMock struct { + mock.Mock +} + +func (s *SplitStorageMock) All() []dtos.SplitDTO { panic("unimplemented") } +func (s *SplitStorageMock) ChangeNumber() (int64, error) { + args := s.Called() + return args.Get(0).(int64), args.Error(1) +} + +func (*SplitStorageMock) FetchMany(splitNames []string) map[string]*dtos.SplitDTO { + panic("unimplemented") +} +func (*SplitStorageMock) GetNamesByFlagSets(sets []string) map[string][]string { + panic("unimplemented") +} +func (*SplitStorageMock) GetAllFlagSetNames() []string { + panic("unimplemented") +} +func (*SplitStorageMock) KillLocally(splitName string, defaultTreatment string, changeNumber int64) { + panic("unimplemented") +} +func (s *SplitStorageMock) SegmentNames() *set.ThreadUnsafeSet { + return s.Called().Get(0).(*set.ThreadUnsafeSet) +} +func (s *SplitStorageMock) LargeSegmentNames() *set.ThreadUnsafeSet { + return s.Called().Get(0).(*set.ThreadUnsafeSet) +} +func (s *SplitStorageMock) SetChangeNumber(changeNumber int64) error { + return s.Called(changeNumber).Error(0) +} +func (*SplitStorageMock) Split(splitName string) *dtos.SplitDTO { panic("unimplemented") } +func (*SplitStorageMock) SplitNames() []string { panic("unimplemented") } +func (*SplitStorageMock) TrafficTypeExists(trafficType string) bool { panic("unimplemented") } +func (*SplitStorageMock) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, changeNumber int64) { + panic("unimplemented") +} + +type SegmentUpdaterMock struct { + mock.Mock +} + +func (s *SegmentUpdaterMock) IsSegmentCached(segmentName string) bool { panic("unimplemented") } +func (s *SegmentUpdaterMock) SegmentNames() []interface{} { panic("unimplemented") } + +func (s *SegmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) { + args := s.Called(name, till) + return args.Get(0).(*segment.UpdateResult), args.Error(1) +} + +func (s *SegmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) { + args := s.Called() + return args.Get(0).(map[string]segment.UpdateResult), args.Error(1) +} + +type SegmentStorageMock struct { + mock.Mock +} + +func (*SegmentStorageMock) SetChangeNumber(segmentName string, till int64) error { + panic("unimplemented") +} +func (s *SegmentStorageMock) Update(name string, toAdd *set.ThreadUnsafeSet, toRemove *set.ThreadUnsafeSet, changeNumber int64) error { + return s.Called(name, toAdd, toRemove, changeNumber).Error(0) +} + +// ChangeNumber implements storage.SegmentStorage +func (s *SegmentStorageMock) ChangeNumber(segmentName string) (int64, error) { + args := s.Called(segmentName) + return args.Get(0).(int64), args.Error(1) +} + +func (*SegmentStorageMock) Keys(segmentName string) *set.ThreadUnsafeSet { panic("unimplemented") } +func (*SegmentStorageMock) SegmentContainsKey(segmentName string, key string) (bool, error) { + panic("unimplemented") +} +func (*SegmentStorageMock) SegmentKeysCount() int64 { panic("unimplemented") } + +// --- + +type LargeSegmentStorageMock struct { + mock.Mock +} + +func (s *LargeSegmentStorageMock) SetChangeNumber(name string, till int64) { + s.Called(name, till).Error(0) +} + +func (s *LargeSegmentStorageMock) Update(name string, userKeys []string, till int64) { + s.Called(name, userKeys, till) +} +func (s *LargeSegmentStorageMock) ChangeNumber(name string) int64 { + args := s.Called(name) + return args.Get(0).(int64) +} +func (s *LargeSegmentStorageMock) Count() int { + args := s.Called() + return args.Get(0).(int) +} +func (s *LargeSegmentStorageMock) LargeSegmentsForUser(userKey string) []string { + return []string{} +} +func (s *LargeSegmentStorageMock) ContainsKey(name string, key string) (bool, error) { + args := s.Called(name, key) + return args.Get(0).(bool), args.Error(1) +} + +// --- + +type LargeSegmentUpdaterMock struct { + mock.Mock +} + +func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegment(name string, till *int64) (*int64, error) { + args := u.Called(name, till) + return args.Get(0).(*int64), args.Error(1) +} +func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegments() (map[string]*int64, error) { + args := u.Called() + return args.Get(0).(map[string]*int64), args.Error(1) +} +func (u *LargeSegmentUpdaterMock) IsCached(name string) bool { + return false +} + +// --- + +var _ gincache.CacheFlusher = (*CacheFlusherMock)(nil) +var _ split.Updater = (*SplitUpdaterMock)(nil) +var _ segment.Updater = (*SegmentUpdaterMock)(nil) +var _ largesegment.Updater = (*LargeSegmentUpdaterMock)(nil) +var _ storage.SplitStorage = (*SplitStorageMock)(nil) +var _ storage.SegmentStorage = (*SegmentStorageMock)(nil) +var _ storage.LargeSegmentsStorage = (*LargeSegmentStorageMock)(nil) diff --git a/splitio/proxy/caching/workers.go b/splitio/proxy/caching/workers.go index deb9be13..3296dc0f 100644 --- a/splitio/proxy/caching/workers.go +++ b/splitio/proxy/caching/workers.go @@ -6,6 +6,7 @@ import ( "github.com/splitio/go-split-commons/v6/healthcheck/application" "github.com/splitio/go-split-commons/v6/service" "github.com/splitio/go-split-commons/v6/storage" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment" "github.com/splitio/go-split-commons/v6/synchronizer/worker/segment" "github.com/splitio/go-split-commons/v6/synchronizer/worker/split" "github.com/splitio/go-toolkit/v5/logging" @@ -151,3 +152,65 @@ func (c *CacheAwareSegmentSynchronizer) SegmentNames() []interface{} { func (c *CacheAwareSegmentSynchronizer) IsSegmentCached(segmentName string) bool { return c.wrapped.IsSegmentCached(segmentName) } + +// CacheAwareLargeSegmentSynchronizer +type CacheAwareLargeSegmentSynchronizer struct { + wrapped largesegment.Updater + largeSegmentStorage storage.LargeSegmentsStorage + cacheFlusher gincache.CacheFlusher + splitStorage storage.SplitStorage +} + +func NewCacheAwareLargeSegmentSync( + splitStorage storage.SplitStorage, + largeSegmentStorage storage.LargeSegmentsStorage, + largeSegmentFetcher service.LargeSegmentFetcher, + logger logging.LoggerInterface, + runtimeTelemetry storage.TelemetryRuntimeProducer, + cacheFlusher gincache.CacheFlusher, + appMonitor application.MonitorProducerInterface, +) *CacheAwareLargeSegmentSynchronizer { + return &CacheAwareLargeSegmentSynchronizer{ + wrapped: largesegment.NewLargeSegmentUpdater(splitStorage, largeSegmentStorage, largeSegmentFetcher, logger, runtimeTelemetry, appMonitor), + cacheFlusher: cacheFlusher, + largeSegmentStorage: largeSegmentStorage, + splitStorage: splitStorage, + } +} + +func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegment(name string, till *int64) (*int64, error) { + previous := c.largeSegmentStorage.ChangeNumber(name) + newCN, err := c.wrapped.SynchronizeLargeSegment(name, till) + + c.shouldEvictBySurrogate(name, previous, *newCN) + + return newCN, err +} + +func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[string]*int64, error) { + previousLargeSegmentNames := c.splitStorage.LargeSegmentNames() + previousCNs := make(map[string]int64, previousLargeSegmentNames.Size()) + for _, name := range previousLargeSegmentNames.List() { + if strName, ok := name.(string); ok { + cn := c.largeSegmentStorage.ChangeNumber(strName) + previousCNs[strName] = cn + } + } + + results, err := c.wrapped.SynchronizeLargeSegments() + for name, res := range results { + c.shouldEvictBySurrogate(name, previousCNs[name], *res) + } + + return results, err +} + +func (c *CacheAwareLargeSegmentSynchronizer) shouldEvictBySurrogate(name string, previousCN int64, currentCN int64) { + if currentCN > previousCN || (previousCN > 0 && currentCN == -1) { + c.cacheFlusher.EvictBySurrogate(MakeSurrogateForLargeSegmentChanges(name)) + } +} + +func (c *CacheAwareLargeSegmentSynchronizer) IsCached(name string) bool { + return c.wrapped.IsCached(name) +} diff --git a/splitio/proxy/caching/workers_test.go b/splitio/proxy/caching/workers_test.go index ad19e847..ce9f094c 100644 --- a/splitio/proxy/caching/workers_test.go +++ b/splitio/proxy/caching/workers_test.go @@ -3,21 +3,19 @@ package caching import ( "testing" - "github.com/splitio/gincache" "github.com/splitio/go-split-commons/v6/dtos" - "github.com/splitio/go-split-commons/v6/storage" "github.com/splitio/go-split-commons/v6/synchronizer/worker/segment" "github.com/splitio/go-split-commons/v6/synchronizer/worker/split" "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/splitio/split-synchronizer/v5/splitio/proxy/caching/mocks" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) func TestCacheAwareSplitSyncNoChanges(t *testing.T) { - var splitSyncMock splitUpdaterMock + var splitSyncMock mocks.SplitUpdaterMock splitSyncMock.On("SynchronizeSplits", (*int64)(nil)).Return((*split.UpdateResult)(nil), error(nil)) - var cacheFlusherMock cacheFlusherMock - var storageMock splitStorageMock + var cacheFlusherMock mocks.CacheFlusherMock + var storageMock mocks.SplitStorageMock storageMock.On("ChangeNumber").Return(int64(-1), error(nil)) css := CacheAwareSplitSynchronizer{ @@ -36,13 +34,13 @@ func TestCacheAwareSplitSyncNoChanges(t *testing.T) { } func TestCacheAwareSplitSyncChanges(t *testing.T) { - var splitSyncMock splitUpdaterMock + var splitSyncMock mocks.SplitUpdaterMock splitSyncMock.On("SynchronizeSplits", (*int64)(nil)).Return((*split.UpdateResult)(nil), error(nil)).Times(2) - var cacheFlusherMock cacheFlusherMock + var cacheFlusherMock mocks.CacheFlusherMock cacheFlusherMock.On("EvictBySurrogate", SplitSurrogate).Times(3) - var storageMock splitStorageMock + var storageMock mocks.SplitStorageMock storageMock.On("ChangeNumber").Return(int64(-1), error(nil)).Once() storageMock.On("ChangeNumber").Return(int64(1), error(nil)).Once() @@ -75,13 +73,13 @@ func TestCacheAwareSplitSyncChangesNewMethod(t *testing.T) { // This test is used to test the new method. Eventually commons should be cleaned in order to have a single method for split-synchronization. // when that happens, either this or the previous test shold be removed - var splitSyncMock splitUpdaterMock + var splitSyncMock mocks.SplitUpdaterMock splitSyncMock.On("SynchronizeFeatureFlags", (*dtos.SplitChangeUpdate)(nil)).Return((*split.UpdateResult)(nil), error(nil)).Times(2) - var cacheFlusherMock cacheFlusherMock + var cacheFlusherMock mocks.CacheFlusherMock cacheFlusherMock.On("EvictBySurrogate", SplitSurrogate).Times(2) - var storageMock splitStorageMock + var storageMock mocks.SplitStorageMock storageMock.On("ChangeNumber").Return(int64(-1), error(nil)).Once() storageMock.On("ChangeNumber").Return(int64(1), error(nil)).Once() @@ -108,14 +106,12 @@ func TestCacheAwareSplitSyncChangesNewMethod(t *testing.T) { } func TestCacheAwareSegmentSyncNoChanges(t *testing.T) { - var segmentUpdater segmentUpdaterMock + var segmentUpdater mocks.SegmentUpdaterMock segmentUpdater.On("SynchronizeSegment", "segment1", (*int64)(nil)).Return(&segment.UpdateResult{}, nil).Once() - var splitStorage splitStorageMock - - var cacheFlusher cacheFlusherMock - - var segmentStorage segmentStorageMock + var splitStorage mocks.SplitStorageMock + var cacheFlusher mocks.CacheFlusherMock + var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment1").Return(int64(0), nil).Once() css := CacheAwareSegmentSynchronizer{ @@ -136,20 +132,20 @@ func TestCacheAwareSegmentSyncNoChanges(t *testing.T) { } func TestCacheAwareSegmentSyncSingle(t *testing.T) { - var segmentUpdater segmentUpdaterMock + var segmentUpdater mocks.SegmentUpdaterMock segmentUpdater.On("SynchronizeSegment", "segment1", (*int64)(nil)).Return(&segment.UpdateResult{ UpdatedKeys: []string{"k1"}, NewChangeNumber: 2, }, nil).Once() - var splitStorage splitStorageMock + var splitStorage mocks.SplitStorageMock - var cacheFlusher cacheFlusherMock + var cacheFlusher mocks.CacheFlusherMock cacheFlusher.On("EvictBySurrogate", MakeSurrogateForSegmentChanges("segment1")).Times(2) cacheFlusher.On("Evict", "/api/mySegments/k1").Times(2) cacheFlusher.On("Evict", "gzip::/api/mySegments/k1").Times(2) - var segmentStorage segmentStorageMock + var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment1").Return(int64(0), nil).Once() css := CacheAwareSegmentSynchronizer{ @@ -180,21 +176,21 @@ func TestCacheAwareSegmentSyncSingle(t *testing.T) { } func TestCacheAwareSegmentSyncAllSegments(t *testing.T) { - var segmentUpdater segmentUpdaterMock + var segmentUpdater mocks.SegmentUpdaterMock segmentUpdater.On("SynchronizeSegments").Return(map[string]segment.UpdateResult{"segment2": { UpdatedKeys: []string{"k1"}, NewChangeNumber: 1, }}, nil).Once() - var splitStorage splitStorageMock + var splitStorage mocks.SplitStorageMock splitStorage.On("SegmentNames").Return(set.NewSet("segment2")).Once() - var cacheFlusher cacheFlusherMock + var cacheFlusher mocks.CacheFlusherMock cacheFlusher.On("EvictBySurrogate", MakeSurrogateForSegmentChanges("segment2")).Times(1) cacheFlusher.On("Evict", "/api/mySegments/k1").Times(3) cacheFlusher.On("Evict", "gzip::/api/mySegments/k1").Times(3) - var segmentStorage segmentStorageMock + var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment2").Return(int64(0), nil).Once() css := CacheAwareSegmentSynchronizer{ @@ -238,142 +234,107 @@ func TestCacheAwareSegmentSyncAllSegments(t *testing.T) { cacheFlusher.AssertExpectations(t) } -// Borrowed mocks: These sohuld be in go-split-commons. but we need to wait until testify is adopted there +// CacheAwareLargeSegmentSynchronizer +func TestSynchronizeLargeSegment(t *testing.T) { + lsName := "largeSegment1" -type splitUpdaterMock struct { - mock.Mock -} + var splitStorage mocks.SplitStorageMock + var cacheFlusher mocks.CacheFlusherMock + cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges(lsName)).Once() -// LocalKill implements split.Updater -func (s *splitUpdaterMock) LocalKill(splitName string, defaultTreatment string, changeNumber int64) { - s.Called(splitName, defaultTreatment, changeNumber) -} + var largeSegmentStorage mocks.LargeSegmentStorageMock + largeSegmentStorage.On("ChangeNumber", lsName).Return(int64(-1)).Once() -// SynchronizeFeatureFlags implements split.Updater -func (s *splitUpdaterMock) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) (*split.UpdateResult, error) { - args := s.Called(ffChange) - return args.Get(0).(*split.UpdateResult), args.Error(1) -} + var lsUpdater mocks.LargeSegmentUpdaterMock + cnToReturn := int64(100) + lsUpdater.On("SynchronizeLargeSegment", lsName, (*int64)(nil)).Return(&cnToReturn, nil).Once() -// SynchronizeSplits implements split.Updater -func (s *splitUpdaterMock) SynchronizeSplits(till *int64) (*split.UpdateResult, error) { - args := s.Called(till) - return args.Get(0).(*split.UpdateResult), args.Error(1) -} + clsSync := CacheAwareLargeSegmentSynchronizer{ + wrapped: &lsUpdater, + cacheFlusher: &cacheFlusher, + largeSegmentStorage: &largeSegmentStorage, + splitStorage: &splitStorage, + } -// ---- + cn, err := clsSync.SynchronizeLargeSegment(lsName, nil) + if err != nil { + t.Error("Error should be nil. Actual: ", err) + } -type cacheFlusherMock struct { - mock.Mock + if *cn != 100 { + t.Error("ChangeNumber should be 100. Actual: ", *cn) + } } -func (c *cacheFlusherMock) Evict(key string) { c.Called(key) } -func (c *cacheFlusherMock) EvictAll() { c.Called() } -func (c *cacheFlusherMock) EvictBySurrogate(surrogate string) { c.Called(surrogate) } +func TestSynchronizeLargeSegmentHighestPrevious(t *testing.T) { + lsName := "largeSegment1" -// --- + var splitStorage mocks.SplitStorageMock + var cacheFlusher mocks.CacheFlusherMock + cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges(lsName)).Times(0) -type splitStorageMock struct { - mock.Mock -} + var largeSegmentStorage mocks.LargeSegmentStorageMock + largeSegmentStorage.On("ChangeNumber", lsName).Return(int64(200)).Once() -func (s *splitStorageMock) All() []dtos.SplitDTO { panic("unimplemented") } -func (s *splitStorageMock) ChangeNumber() (int64, error) { - args := s.Called() - return args.Get(0).(int64), args.Error(1) -} + var lsUpdater mocks.LargeSegmentUpdaterMock + cnToReturn := int64(100) + lsUpdater.On("SynchronizeLargeSegment", lsName, (*int64)(nil)).Return(&cnToReturn, nil).Once() -func (*splitStorageMock) FetchMany(splitNames []string) map[string]*dtos.SplitDTO { - panic("unimplemented") -} -func (*splitStorageMock) GetNamesByFlagSets(sets []string) map[string][]string { - panic("unimplemented") -} -func (*splitStorageMock) GetAllFlagSetNames() []string { - panic("unimplemented") -} -func (*splitStorageMock) KillLocally(splitName string, defaultTreatment string, changeNumber int64) { - panic("unimplemented") -} -func (s *splitStorageMock) SegmentNames() *set.ThreadUnsafeSet { - return s.Called().Get(0).(*set.ThreadUnsafeSet) -} -func (s *splitStorageMock) SetChangeNumber(changeNumber int64) error { - return s.Called(changeNumber).Error(0) -} -func (*splitStorageMock) Split(splitName string) *dtos.SplitDTO { panic("unimplemented") } -func (*splitStorageMock) SplitNames() []string { panic("unimplemented") } -func (*splitStorageMock) TrafficTypeExists(trafficType string) bool { panic("unimplemented") } -func (*splitStorageMock) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, changeNumber int64) { - panic("unimplemented") -} - -type segmentUpdaterMock struct { - mock.Mock -} - -func (s *segmentUpdaterMock) IsSegmentCached(segmentName string) bool { panic("unimplemented") } -func (s *segmentUpdaterMock) SegmentNames() []interface{} { panic("unimplemented") } + clsSync := CacheAwareLargeSegmentSynchronizer{ + wrapped: &lsUpdater, + cacheFlusher: &cacheFlusher, + largeSegmentStorage: &largeSegmentStorage, + splitStorage: &splitStorage, + } -func (s *segmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) { - args := s.Called(name, till) - return args.Get(0).(*segment.UpdateResult), args.Error(1) -} + cn, err := clsSync.SynchronizeLargeSegment(lsName, nil) + if err != nil { + t.Error("Error should be nil. Actual: ", err) + } -func (s *segmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) { - args := s.Called() - return args.Get(0).(map[string]segment.UpdateResult), args.Error(1) + if *cn != 100 { + t.Error("ChangeNumber should be 100. Actual: ", *cn) + } } -type segmentStorageMock struct { - mock.Mock -} +func TestSynchronizeLargeSegments(t *testing.T) { + var splitStorage mocks.SplitStorageMock + splitStorage.On("LargeSegmentNames").Return(set.NewSet("ls1", "ls2")) -func (*segmentStorageMock) SetChangeNumber(segmentName string, till int64) error { - panic("unimplemented") -} -func (s *segmentStorageMock) Update(name string, toAdd *set.ThreadUnsafeSet, toRemove *set.ThreadUnsafeSet, changeNumber int64) error { - return s.Called(name, toAdd, toRemove, changeNumber).Error(0) -} + var cacheFlusher mocks.CacheFlusherMock + cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges("ls1")).Once() + cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges("ls2")).Once() -// ChangeNumber implements storage.SegmentStorage -func (s *segmentStorageMock) ChangeNumber(segmentName string) (int64, error) { - args := s.Called(segmentName) - return args.Get(0).(int64), args.Error(1) -} + var cn1 int64 = 100 + var cn2 int64 = 200 + var largeSegmentStorage mocks.LargeSegmentStorageMock + largeSegmentStorage.On("ChangeNumber", "ls1").Return(cn1).Once() + largeSegmentStorage.On("ChangeNumber", "ls2").Return(cn2).Once() -func (*segmentStorageMock) Keys(segmentName string) *set.ThreadUnsafeSet { panic("unimplemented") } -func (*segmentStorageMock) SegmentContainsKey(segmentName string, key string) (bool, error) { - panic("unimplemented") -} -func (*segmentStorageMock) SegmentKeysCount() int64 { panic("unimplemented") } - -/* - type segmentUpdaterMock struct { - SynchronizeSegmentCall func(name string, till *int64) (*segment.UpdateResult, error) - SynchronizeSegmentsCall func() (map[string]segment.UpdateResult, error) - SegmentNamesCall func() []interface{} - IsSegmentCachedCall func(segmentName string) bool + var lsUpdater mocks.LargeSegmentUpdaterMock + result := map[string]*int64{ + "ls1": &cn1, + "ls2": &cn2, } + lsUpdater.On("SynchronizeLargeSegments").Return(result, nil).Once() - func (s *segmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) { - return s.SynchronizeSegmentCall(name, till) + clsSync := CacheAwareLargeSegmentSynchronizer{ + wrapped: &lsUpdater, + cacheFlusher: &cacheFlusher, + largeSegmentStorage: &largeSegmentStorage, + splitStorage: &splitStorage, } - func (s *segmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) { - return s.SynchronizeSegmentsCall() + cn, err := clsSync.SynchronizeLargeSegments() + if err != nil { + t.Error("Error should be nil. Actual: ", err) } - func (s *segmentUpdaterMock) SegmentNames() []interface{} { - return s.SegmentNamesCall() + if *cn["ls1"] != cn1 { + t.Error("ChangeNumber should be 100. Actual: ", *cn["ls1"]) } - func (s *segmentUpdaterMock) IsSegmentCached(segmentName string) bool { - return s.IsSegmentCachedCall(segmentName) + if *cn["ls2"] != cn2 { + t.Error("ChangeNumber should be 200. Actual: ", *cn["ls2"]) } -*/ -var _ split.Updater = (*splitUpdaterMock)(nil) -var _ storage.SplitStorage = (*splitStorageMock)(nil) -var _ gincache.CacheFlusher = (*cacheFlusherMock)(nil) -var _ segment.Updater = (*segmentUpdaterMock)(nil) -var _ storage.SegmentStorage = (*segmentStorageMock)(nil) +} diff --git a/splitio/proxy/storage/splits.go b/splitio/proxy/storage/splits.go index 9b3f2f8c..2d6fa6e6 100644 --- a/splitio/proxy/storage/splits.go +++ b/splitio/proxy/storage/splits.go @@ -156,6 +156,11 @@ func (p *ProxySplitStorageImpl) FetchMany(names []string) map[string]*dtos.Split // SegmentNames call is forwarded to the snapshot func (p *ProxySplitStorageImpl) SegmentNames() *set.ThreadUnsafeSet { return p.snapshot.SegmentNames() } +// LargeSegmentNames call is forwarded to the snapshot +func (p *ProxySplitStorageImpl) LargeSegmentNames() *set.ThreadUnsafeSet { + return p.snapshot.LargeSegmentNames() +} + // Split call is forwarded to the snapshot func (p *ProxySplitStorageImpl) Split(name string) *dtos.SplitDTO { return p.snapshot.Split(name) } From b8237b943ddbb8d0ae28ff84e905c85da51b9694 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Wed, 20 Nov 2024 10:32:54 -0300 Subject: [PATCH 2/8] adding AssertExpectations --- splitio/proxy/caching/workers_test.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/splitio/proxy/caching/workers_test.go b/splitio/proxy/caching/workers_test.go index ce9f094c..eb3f9bf0 100644 --- a/splitio/proxy/caching/workers_test.go +++ b/splitio/proxy/caching/workers_test.go @@ -264,6 +264,10 @@ func TestSynchronizeLargeSegment(t *testing.T) { if *cn != 100 { t.Error("ChangeNumber should be 100. Actual: ", *cn) } + + cacheFlusher.AssertExpectations(t) + largeSegmentStorage.AssertExpectations(t) + lsUpdater.AssertExpectations(t) } func TestSynchronizeLargeSegmentHighestPrevious(t *testing.T) { @@ -271,7 +275,6 @@ func TestSynchronizeLargeSegmentHighestPrevious(t *testing.T) { var splitStorage mocks.SplitStorageMock var cacheFlusher mocks.CacheFlusherMock - cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges(lsName)).Times(0) var largeSegmentStorage mocks.LargeSegmentStorageMock largeSegmentStorage.On("ChangeNumber", lsName).Return(int64(200)).Once() @@ -295,6 +298,11 @@ func TestSynchronizeLargeSegmentHighestPrevious(t *testing.T) { if *cn != 100 { t.Error("ChangeNumber should be 100. Actual: ", *cn) } + + splitStorage.AssertExpectations(t) + cacheFlusher.AssertExpectations(t) + largeSegmentStorage.AssertExpectations(t) + lsUpdater.AssertExpectations(t) } func TestSynchronizeLargeSegments(t *testing.T) { @@ -337,4 +345,9 @@ func TestSynchronizeLargeSegments(t *testing.T) { if *cn["ls2"] != cn2 { t.Error("ChangeNumber should be 200. Actual: ", *cn["ls2"]) } + + splitStorage.AssertExpectations(t) + cacheFlusher.AssertExpectations(t) + largeSegmentStorage.AssertExpectations(t) + lsUpdater.AssertExpectations(t) } From 21a8c3f88e8d8c691a09f7c7729e938fac431142 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Wed, 20 Nov 2024 10:49:15 -0300 Subject: [PATCH 3/8] fixing test --- splitio/proxy/caching/workers.go | 5 ++++- splitio/proxy/caching/workers_test.go | 4 ++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/splitio/proxy/caching/workers.go b/splitio/proxy/caching/workers.go index 3296dc0f..eab8da66 100644 --- a/splitio/proxy/caching/workers.go +++ b/splitio/proxy/caching/workers.go @@ -1,6 +1,8 @@ package caching import ( + "fmt" + "github.com/splitio/go-split-commons/v6/dtos" "github.com/splitio/go-split-commons/v6/flagsets" "github.com/splitio/go-split-commons/v6/healthcheck/application" @@ -206,7 +208,8 @@ func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[str } func (c *CacheAwareLargeSegmentSynchronizer) shouldEvictBySurrogate(name string, previousCN int64, currentCN int64) { - if currentCN > previousCN || (previousCN > 0 && currentCN == -1) { + if currentCN > previousCN || currentCN == -1 { + fmt.Println("aca") c.cacheFlusher.EvictBySurrogate(MakeSurrogateForLargeSegmentChanges(name)) } } diff --git a/splitio/proxy/caching/workers_test.go b/splitio/proxy/caching/workers_test.go index eb3f9bf0..8f50c625 100644 --- a/splitio/proxy/caching/workers_test.go +++ b/splitio/proxy/caching/workers_test.go @@ -316,8 +316,8 @@ func TestSynchronizeLargeSegments(t *testing.T) { var cn1 int64 = 100 var cn2 int64 = 200 var largeSegmentStorage mocks.LargeSegmentStorageMock - largeSegmentStorage.On("ChangeNumber", "ls1").Return(cn1).Once() - largeSegmentStorage.On("ChangeNumber", "ls2").Return(cn2).Once() + largeSegmentStorage.On("ChangeNumber", "ls1").Return(cn1 - 50).Once() + largeSegmentStorage.On("ChangeNumber", "ls2").Return(cn2 - 50).Once() var lsUpdater mocks.LargeSegmentUpdaterMock result := map[string]*int64{ From c7bc7b4054e2a84a5f2cc5ce8eeb869b399f088d Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Wed, 20 Nov 2024 11:10:40 -0300 Subject: [PATCH 4/8] removing println --- splitio/proxy/caching/workers.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/splitio/proxy/caching/workers.go b/splitio/proxy/caching/workers.go index eab8da66..e7b01c19 100644 --- a/splitio/proxy/caching/workers.go +++ b/splitio/proxy/caching/workers.go @@ -1,8 +1,6 @@ package caching import ( - "fmt" - "github.com/splitio/go-split-commons/v6/dtos" "github.com/splitio/go-split-commons/v6/flagsets" "github.com/splitio/go-split-commons/v6/healthcheck/application" @@ -209,7 +207,6 @@ func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[str func (c *CacheAwareLargeSegmentSynchronizer) shouldEvictBySurrogate(name string, previousCN int64, currentCN int64) { if currentCN > previousCN || currentCN == -1 { - fmt.Println("aca") c.cacheFlusher.EvictBySurrogate(MakeSurrogateForLargeSegmentChanges(name)) } } From a7e24cf8d632a271ec45df4d37bcdfdef9de59bd Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Fri, 22 Nov 2024 16:29:48 -0300 Subject: [PATCH 5/8] polishing --- go.mod | 2 +- go.sum | 8 ++------ splitio/proxy/caching/mocks/mock.go | 8 ++++++-- splitio/proxy/caching/workers.go | 9 +++++++++ 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 4c93a7e8..5c72c786 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.3.0 github.com/splitio/gincache v1.0.1 - github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941 + github.com/splitio/go-split-commons/v6 v6.0.2-0.20241122192419-cc8d0413fa8e github.com/splitio/go-toolkit/v5 v5.4.0 github.com/stretchr/testify v1.9.0 go.etcd.io/bbolt v1.3.6 diff --git a/go.sum b/go.sum index 78072e6d..b4885fc2 100644 --- a/go.sum +++ b/go.sum @@ -93,12 +93,8 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU= github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY= -github.com/splitio/go-split-commons/v6 v6.0.1 h1:WJcvTk8lwWw6kLQvxt8hOkY/tGlBN4w+2agkINPGugY= -github.com/splitio/go-split-commons/v6 v6.0.1/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= -github.com/splitio/go-split-commons/v6 v6.0.2-0.20241115210219-e8964f13d489 h1:9sr63h4Kco1TSPtwaiECfRYfNvDMYtvQL2q4r62drDo= -github.com/splitio/go-split-commons/v6 v6.0.2-0.20241115210219-e8964f13d489/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= -github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941 h1:Rzpm7u9uIaTsQDvSWRPsMbmBpZeg2kUQkVt0+30ubj4= -github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241122192419-cc8d0413fa8e h1:0PMjUqkHtruM6arvxNbC3Yu6goqdkL/FuYeoWNmQ3K4= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241122192419-cc8d0413fa8e/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY= github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM= github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/splitio/proxy/caching/mocks/mock.go b/splitio/proxy/caching/mocks/mock.go index 06f3682f..27a1224b 100644 --- a/splitio/proxy/caching/mocks/mock.go +++ b/splitio/proxy/caching/mocks/mock.go @@ -148,7 +148,7 @@ func (s *LargeSegmentStorageMock) Count() int { func (s *LargeSegmentStorageMock) LargeSegmentsForUser(userKey string) []string { return []string{} } -func (s *LargeSegmentStorageMock) ContainsKey(name string, key string) (bool, error) { +func (s *LargeSegmentStorageMock) IsInLargeSegment(name string, key string) (bool, error) { args := s.Called(name, key) return args.Get(0).(bool), args.Error(1) } @@ -168,7 +168,11 @@ func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegments() (map[string]*int64, return args.Get(0).(map[string]*int64), args.Error(1) } func (u *LargeSegmentUpdaterMock) IsCached(name string) bool { - return false + return u.Called().Get(0).(bool) +} +func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegmentUpdate(lsRFDResponseDTO *dtos.LargeSegmentRFDResponseDTO) (*int64, error) { + args := u.Called(lsRFDResponseDTO) + return args.Get(0).(*int64), args.Error(1) } // --- diff --git a/splitio/proxy/caching/workers.go b/splitio/proxy/caching/workers.go index e7b01c19..a3d1edfa 100644 --- a/splitio/proxy/caching/workers.go +++ b/splitio/proxy/caching/workers.go @@ -214,3 +214,12 @@ func (c *CacheAwareLargeSegmentSynchronizer) shouldEvictBySurrogate(name string, func (c *CacheAwareLargeSegmentSynchronizer) IsCached(name string) bool { return c.wrapped.IsCached(name) } + +func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegmentUpdate(lsRFDResponseDTO *dtos.LargeSegmentRFDResponseDTO) (*int64, error) { + previous := c.largeSegmentStorage.ChangeNumber(lsRFDResponseDTO.Name) + newCN, err := c.wrapped.SynchronizeLargeSegmentUpdate(lsRFDResponseDTO) + + c.shouldEvictBySurrogate(lsRFDResponseDTO.Name, previous, *newCN) + + return newCN, err +} From 01b0079d98aed1bb7f91e0fdf246ebc595abe6c5 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Mon, 25 Nov 2024 14:57:36 -0300 Subject: [PATCH 6/8] wip --- go.mod | 4 +++- go.sum | 2 -- splitio/proxy/caching/mocks/mock.go | 3 +++ splitio/proxy/caching/workers.go | 2 +- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 5c72c786..55530ba1 100644 --- a/go.mod +++ b/go.mod @@ -8,13 +8,15 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.3.0 github.com/splitio/gincache v1.0.1 - github.com/splitio/go-split-commons/v6 v6.0.2-0.20241122192419-cc8d0413fa8e + github.com/splitio/go-split-commons/v6 v6.0.2-0.20241125153044-959311072c68 github.com/splitio/go-toolkit/v5 v5.4.0 github.com/stretchr/testify v1.9.0 go.etcd.io/bbolt v1.3.6 golang.org/x/exp v0.0.0-20231006140011-7918f672742d ) +replace github.com/splitio/go-split-commons/v6 => /Users/maurosanz/go/src/github/splitio/go-split-commons + require ( github.com/bits-and-blooms/bitset v1.3.1 // indirect github.com/bits-and-blooms/bloom/v3 v3.3.1 // indirect diff --git a/go.sum b/go.sum index b4885fc2..c6504405 100644 --- a/go.sum +++ b/go.sum @@ -93,8 +93,6 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU= github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY= -github.com/splitio/go-split-commons/v6 v6.0.2-0.20241122192419-cc8d0413fa8e h1:0PMjUqkHtruM6arvxNbC3Yu6goqdkL/FuYeoWNmQ3K4= -github.com/splitio/go-split-commons/v6 v6.0.2-0.20241122192419-cc8d0413fa8e/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY= github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM= github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/splitio/proxy/caching/mocks/mock.go b/splitio/proxy/caching/mocks/mock.go index 27a1224b..d3ef093d 100644 --- a/splitio/proxy/caching/mocks/mock.go +++ b/splitio/proxy/caching/mocks/mock.go @@ -152,6 +152,9 @@ func (s *LargeSegmentStorageMock) IsInLargeSegment(name string, key string) (boo args := s.Called(name, key) return args.Get(0).(bool), args.Error(1) } +func (s *LargeSegmentStorageMock) TotalKeys(name string) int { + return s.Called(name).Get(0).(int) +} // --- diff --git a/splitio/proxy/caching/workers.go b/splitio/proxy/caching/workers.go index a3d1edfa..682dc466 100644 --- a/splitio/proxy/caching/workers.go +++ b/splitio/proxy/caching/workers.go @@ -206,7 +206,7 @@ func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[str } func (c *CacheAwareLargeSegmentSynchronizer) shouldEvictBySurrogate(name string, previousCN int64, currentCN int64) { - if currentCN > previousCN || currentCN == -1 { + if currentCN > previousCN || (previousCN != -1 && currentCN == -1) { c.cacheFlusher.EvictBySurrogate(MakeSurrogateForLargeSegmentChanges(name)) } } From 569561b8717b72a00ba6cb2b1d0a7373d835994a Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Tue, 26 Nov 2024 16:09:58 -0300 Subject: [PATCH 7/8] adding purge stragegy --- go.mod | 4 +--- go.sum | 2 ++ splitio/proxy/caching/caching.go | 18 +++--------------- splitio/proxy/caching/workers.go | 20 ++++++++++---------- splitio/proxy/caching/workers_test.go | 5 ++--- 5 files changed, 18 insertions(+), 31 deletions(-) diff --git a/go.mod b/go.mod index 55530ba1..7d7e7746 100644 --- a/go.mod +++ b/go.mod @@ -8,15 +8,13 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.3.0 github.com/splitio/gincache v1.0.1 - github.com/splitio/go-split-commons/v6 v6.0.2-0.20241125153044-959311072c68 + github.com/splitio/go-split-commons/v6 v6.0.2-0.20241126190617-03ffbb3a100f github.com/splitio/go-toolkit/v5 v5.4.0 github.com/stretchr/testify v1.9.0 go.etcd.io/bbolt v1.3.6 golang.org/x/exp v0.0.0-20231006140011-7918f672742d ) -replace github.com/splitio/go-split-commons/v6 => /Users/maurosanz/go/src/github/splitio/go-split-commons - require ( github.com/bits-and-blooms/bitset v1.3.1 // indirect github.com/bits-and-blooms/bloom/v3 v3.3.1 // indirect diff --git a/go.sum b/go.sum index c6504405..d7333a81 100644 --- a/go.sum +++ b/go.sum @@ -93,6 +93,8 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU= github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241126190617-03ffbb3a100f h1:FbR7KHn2kdtBda7CZVxut2qfFH7ZI4XW+FV6B0JpWgM= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241126190617-03ffbb3a100f/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY= github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM= github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/splitio/proxy/caching/caching.go b/splitio/proxy/caching/caching.go index cc64e021..665731af 100644 --- a/splitio/proxy/caching/caching.go +++ b/splitio/proxy/caching/caching.go @@ -19,21 +19,17 @@ const ( // SplitSurrogate key (we only need one, since all splitChanges should be expired when an update is processed) SplitSurrogate = "sp" + // LargeSegmentSurrogate key (we only need one, since all memberships should be expired when an update is processed) + LargeSegmentSurrogate = "ls" + // AuthSurrogate key (having push disabled, it's safe to cache this and return it on all requests) AuthSurrogate = "au" segmentPrefix = "se::" - - largeSegmentPrefix = "ls::" ) const cacheSize = 1000000 -// MakeSurrogateForSegmentChanges creates a surrogate key for the segment being queried -func MakeSurrogateForLargeSegmentChanges(name string) string { - return largeSegmentPrefix + name -} - // MakeSurrogateForSegmentChanges creates a surrogate key for the segment being queried func MakeSurrogateForSegmentChanges(segmentName string) string { return segmentPrefix + segmentName @@ -45,14 +41,6 @@ func MakeSurrogateForMySegments(mysegments []dtos.MySegmentDTO) []string { return nil } -// MakeMembershipsEntries create a cache entry key for Memberships -func MakeMembershipsEntries(key string) []string { - return []string{ - "/api/memberships/" + key, - "gzip::/api/memberships/" + key, - } -} - // MakeMySegmentsEntry create a cache entry key for mysegments func MakeMySegmentsEntries(key string) []string { return []string{ diff --git a/splitio/proxy/caching/workers.go b/splitio/proxy/caching/workers.go index 682dc466..b3d16827 100644 --- a/splitio/proxy/caching/workers.go +++ b/splitio/proxy/caching/workers.go @@ -182,7 +182,7 @@ func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegment(name string previous := c.largeSegmentStorage.ChangeNumber(name) newCN, err := c.wrapped.SynchronizeLargeSegment(name, till) - c.shouldEvictBySurrogate(name, previous, *newCN) + c.shouldEvictBySurrogate(previous, *newCN) return newCN, err } @@ -198,19 +198,13 @@ func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[str } results, err := c.wrapped.SynchronizeLargeSegments() - for name, res := range results { - c.shouldEvictBySurrogate(name, previousCNs[name], *res) + for name, currentCN := range results { + c.shouldEvictBySurrogate(previousCNs[name], *currentCN) } return results, err } -func (c *CacheAwareLargeSegmentSynchronizer) shouldEvictBySurrogate(name string, previousCN int64, currentCN int64) { - if currentCN > previousCN || (previousCN != -1 && currentCN == -1) { - c.cacheFlusher.EvictBySurrogate(MakeSurrogateForLargeSegmentChanges(name)) - } -} - func (c *CacheAwareLargeSegmentSynchronizer) IsCached(name string) bool { return c.wrapped.IsCached(name) } @@ -219,7 +213,13 @@ func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegmentUpdate(lsRFD previous := c.largeSegmentStorage.ChangeNumber(lsRFDResponseDTO.Name) newCN, err := c.wrapped.SynchronizeLargeSegmentUpdate(lsRFDResponseDTO) - c.shouldEvictBySurrogate(lsRFDResponseDTO.Name, previous, *newCN) + c.shouldEvictBySurrogate(previous, *newCN) return newCN, err } + +func (c *CacheAwareLargeSegmentSynchronizer) shouldEvictBySurrogate(previousCN int64, currentCN int64) { + if currentCN > previousCN || currentCN == -1 { + c.cacheFlusher.EvictBySurrogate(LargeSegmentSurrogate) + } +} diff --git a/splitio/proxy/caching/workers_test.go b/splitio/proxy/caching/workers_test.go index 8f50c625..2c83d49a 100644 --- a/splitio/proxy/caching/workers_test.go +++ b/splitio/proxy/caching/workers_test.go @@ -240,7 +240,7 @@ func TestSynchronizeLargeSegment(t *testing.T) { var splitStorage mocks.SplitStorageMock var cacheFlusher mocks.CacheFlusherMock - cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges(lsName)).Once() + cacheFlusher.On("EvictBySurrogate", LargeSegmentSurrogate).Once() var largeSegmentStorage mocks.LargeSegmentStorageMock largeSegmentStorage.On("ChangeNumber", lsName).Return(int64(-1)).Once() @@ -310,8 +310,7 @@ func TestSynchronizeLargeSegments(t *testing.T) { splitStorage.On("LargeSegmentNames").Return(set.NewSet("ls1", "ls2")) var cacheFlusher mocks.CacheFlusherMock - cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges("ls1")).Once() - cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges("ls2")).Once() + cacheFlusher.On("EvictBySurrogate", LargeSegmentSurrogate).Times(2) var cn1 int64 = 100 var cn2 int64 = 200 From 028f2e3fb73b36def63e83f9d6c1e94b0c7097f5 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Wed, 27 Nov 2024 14:11:44 -0300 Subject: [PATCH 8/8] pr feedback --- splitio/proxy/caching/workers.go | 10 ++++++---- splitio/proxy/caching/workers_test.go | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/splitio/proxy/caching/workers.go b/splitio/proxy/caching/workers.go index b3d16827..52dc03ab 100644 --- a/splitio/proxy/caching/workers.go +++ b/splitio/proxy/caching/workers.go @@ -99,6 +99,7 @@ func (c *CacheAwareSegmentSynchronizer) SynchronizeSegment(name string, till *in result, err := c.wrapped.SynchronizeSegment(name, till) if current := result.NewChangeNumber; current > previous || (previous != -1 && current == -1) { c.cacheFlusher.EvictBySurrogate(MakeSurrogateForSegmentChanges(name)) + c.cacheFlusher.EvictBySurrogate(LargeSegmentSurrogate) } // remove individual entries for each affected key @@ -130,6 +131,7 @@ func (c *CacheAwareSegmentSynchronizer) SynchronizeSegments() (map[string]segmen if pcn, _ := previousCNs[segmentName]; ccn > pcn || (pcn > 0 && ccn == -1) { // if the segment was updated or the segment was removed, evict it c.cacheFlusher.EvictBySurrogate(MakeSurrogateForSegmentChanges(segmentName)) + c.cacheFlusher.EvictBySurrogate(LargeSegmentSurrogate) } for idx := range result.UpdatedKeys { @@ -182,7 +184,7 @@ func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegment(name string previous := c.largeSegmentStorage.ChangeNumber(name) newCN, err := c.wrapped.SynchronizeLargeSegment(name, till) - c.shouldEvictBySurrogate(previous, *newCN) + c.evictByLargeSegmentSurrogate(previous, *newCN) return newCN, err } @@ -199,7 +201,7 @@ func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[str results, err := c.wrapped.SynchronizeLargeSegments() for name, currentCN := range results { - c.shouldEvictBySurrogate(previousCNs[name], *currentCN) + c.evictByLargeSegmentSurrogate(previousCNs[name], *currentCN) } return results, err @@ -213,12 +215,12 @@ func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegmentUpdate(lsRFD previous := c.largeSegmentStorage.ChangeNumber(lsRFDResponseDTO.Name) newCN, err := c.wrapped.SynchronizeLargeSegmentUpdate(lsRFDResponseDTO) - c.shouldEvictBySurrogate(previous, *newCN) + c.evictByLargeSegmentSurrogate(previous, *newCN) return newCN, err } -func (c *CacheAwareLargeSegmentSynchronizer) shouldEvictBySurrogate(previousCN int64, currentCN int64) { +func (c *CacheAwareLargeSegmentSynchronizer) evictByLargeSegmentSurrogate(previousCN int64, currentCN int64) { if currentCN > previousCN || currentCN == -1 { c.cacheFlusher.EvictBySurrogate(LargeSegmentSurrogate) } diff --git a/splitio/proxy/caching/workers_test.go b/splitio/proxy/caching/workers_test.go index 2c83d49a..4eaf70fe 100644 --- a/splitio/proxy/caching/workers_test.go +++ b/splitio/proxy/caching/workers_test.go @@ -144,6 +144,7 @@ func TestCacheAwareSegmentSyncSingle(t *testing.T) { cacheFlusher.On("EvictBySurrogate", MakeSurrogateForSegmentChanges("segment1")).Times(2) cacheFlusher.On("Evict", "/api/mySegments/k1").Times(2) cacheFlusher.On("Evict", "gzip::/api/mySegments/k1").Times(2) + cacheFlusher.On("EvictBySurrogate", LargeSegmentSurrogate).Times(2) var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment1").Return(int64(0), nil).Once() @@ -189,6 +190,7 @@ func TestCacheAwareSegmentSyncAllSegments(t *testing.T) { cacheFlusher.On("EvictBySurrogate", MakeSurrogateForSegmentChanges("segment2")).Times(1) cacheFlusher.On("Evict", "/api/mySegments/k1").Times(3) cacheFlusher.On("Evict", "gzip::/api/mySegments/k1").Times(3) + cacheFlusher.On("EvictBySurrogate", LargeSegmentSurrogate).Times(3) var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment2").Return(int64(0), nil).Once()