From 4836d47b42f34966c94b9abc845a881dcec541a3 Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Tue, 19 Nov 2024 19:17:15 -0300 Subject: [PATCH] LargeSegment caching implementation --- go.mod | 2 +- go.sum | 4 + splitio/proxy/caching/caching.go | 16 +- splitio/proxy/caching/mocks/mock.go | 182 ++++++++++++++++++++ splitio/proxy/caching/workers.go | 63 +++++++ splitio/proxy/caching/workers_test.go | 235 +++++++++++--------------- splitio/proxy/storage/splits.go | 5 + 7 files changed, 368 insertions(+), 139 deletions(-) create mode 100644 splitio/proxy/caching/mocks/mock.go diff --git a/go.mod b/go.mod index 35f9e160..4c93a7e8 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.3.0 github.com/splitio/gincache v1.0.1 - github.com/splitio/go-split-commons/v6 v6.0.1 + github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941 github.com/splitio/go-toolkit/v5 v5.4.0 github.com/stretchr/testify v1.9.0 go.etcd.io/bbolt v1.3.6 diff --git a/go.sum b/go.sum index 9f6e2ecd..78072e6d 100644 --- a/go.sum +++ b/go.sum @@ -95,6 +95,10 @@ github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IX github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY= github.com/splitio/go-split-commons/v6 v6.0.1 h1:WJcvTk8lwWw6kLQvxt8hOkY/tGlBN4w+2agkINPGugY= github.com/splitio/go-split-commons/v6 v6.0.1/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241115210219-e8964f13d489 h1:9sr63h4Kco1TSPtwaiECfRYfNvDMYtvQL2q4r62drDo= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241115210219-e8964f13d489/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941 h1:Rzpm7u9uIaTsQDvSWRPsMbmBpZeg2kUQkVt0+30ubj4= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241119171952-0b46694a2941/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM= github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/splitio/proxy/caching/caching.go b/splitio/proxy/caching/caching.go index efae93cf..cc64e021 100644 --- a/splitio/proxy/caching/caching.go +++ b/splitio/proxy/caching/caching.go @@ -23,10 +23,17 @@ const ( AuthSurrogate = "au" segmentPrefix = "se::" + + largeSegmentPrefix = "ls::" ) const cacheSize = 1000000 +// MakeSurrogateForSegmentChanges creates a surrogate key for the segment being queried +func MakeSurrogateForLargeSegmentChanges(name string) string { + return largeSegmentPrefix + name +} + // MakeSurrogateForSegmentChanges creates a surrogate key for the segment being queried func MakeSurrogateForSegmentChanges(segmentName string) string { return segmentPrefix + segmentName @@ -38,6 +45,14 @@ func MakeSurrogateForMySegments(mysegments []dtos.MySegmentDTO) []string { return nil } +// MakeMembershipsEntries create a cache entry key for Memberships +func MakeMembershipsEntries(key string) []string { + return []string{ + "/api/memberships/" + key, + "gzip::/api/memberships/" + key, + } +} + // MakeMySegmentsEntry create a cache entry key for mysegments func MakeMySegmentsEntries(key string) []string { return []string{ @@ -60,7 +75,6 @@ func MakeProxyCache() *gincache.Middleware { } func keyFactoryFN(ctx *gin.Context) string { - var encodingPrefix string if strings.Contains(ctx.Request.Header.Get("Accept-Encoding"), "gzip") { encodingPrefix = "gzip::" diff --git a/splitio/proxy/caching/mocks/mock.go b/splitio/proxy/caching/mocks/mock.go new file mode 100644 index 00000000..06f3682f --- /dev/null +++ b/splitio/proxy/caching/mocks/mock.go @@ -0,0 +1,182 @@ +package mocks + +import ( + "github.com/splitio/gincache" + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-split-commons/v6/storage" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/segment" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/split" + "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/stretchr/testify/mock" +) + +// Borrowed mocks: These sohuld be in go-split-commons. but we need to wait until testify is adopted there + +type SplitUpdaterMock struct { + mock.Mock +} + +// LocalKill implements split.Updater +func (s *SplitUpdaterMock) LocalKill(splitName string, defaultTreatment string, changeNumber int64) { + s.Called(splitName, defaultTreatment, changeNumber) +} + +// SynchronizeFeatureFlags implements split.Updater +func (s *SplitUpdaterMock) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) (*split.UpdateResult, error) { + args := s.Called(ffChange) + return args.Get(0).(*split.UpdateResult), args.Error(1) +} + +// SynchronizeSplits implements split.Updater +func (s *SplitUpdaterMock) SynchronizeSplits(till *int64) (*split.UpdateResult, error) { + args := s.Called(till) + return args.Get(0).(*split.UpdateResult), args.Error(1) +} + +// ---- + +type CacheFlusherMock struct { + mock.Mock +} + +func (c *CacheFlusherMock) Evict(key string) { c.Called(key) } +func (c *CacheFlusherMock) EvictAll() { c.Called() } +func (c *CacheFlusherMock) EvictBySurrogate(surrogate string) { c.Called(surrogate) } + +// --- + +type SplitStorageMock struct { + mock.Mock +} + +func (s *SplitStorageMock) All() []dtos.SplitDTO { panic("unimplemented") } +func (s *SplitStorageMock) ChangeNumber() (int64, error) { + args := s.Called() + return args.Get(0).(int64), args.Error(1) +} + +func (*SplitStorageMock) FetchMany(splitNames []string) map[string]*dtos.SplitDTO { + panic("unimplemented") +} +func (*SplitStorageMock) GetNamesByFlagSets(sets []string) map[string][]string { + panic("unimplemented") +} +func (*SplitStorageMock) GetAllFlagSetNames() []string { + panic("unimplemented") +} +func (*SplitStorageMock) KillLocally(splitName string, defaultTreatment string, changeNumber int64) { + panic("unimplemented") +} +func (s *SplitStorageMock) SegmentNames() *set.ThreadUnsafeSet { + return s.Called().Get(0).(*set.ThreadUnsafeSet) +} +func (s *SplitStorageMock) LargeSegmentNames() *set.ThreadUnsafeSet { + return s.Called().Get(0).(*set.ThreadUnsafeSet) +} +func (s *SplitStorageMock) SetChangeNumber(changeNumber int64) error { + return s.Called(changeNumber).Error(0) +} +func (*SplitStorageMock) Split(splitName string) *dtos.SplitDTO { panic("unimplemented") } +func (*SplitStorageMock) SplitNames() []string { panic("unimplemented") } +func (*SplitStorageMock) TrafficTypeExists(trafficType string) bool { panic("unimplemented") } +func (*SplitStorageMock) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, changeNumber int64) { + panic("unimplemented") +} + +type SegmentUpdaterMock struct { + mock.Mock +} + +func (s *SegmentUpdaterMock) IsSegmentCached(segmentName string) bool { panic("unimplemented") } +func (s *SegmentUpdaterMock) SegmentNames() []interface{} { panic("unimplemented") } + +func (s *SegmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) { + args := s.Called(name, till) + return args.Get(0).(*segment.UpdateResult), args.Error(1) +} + +func (s *SegmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) { + args := s.Called() + return args.Get(0).(map[string]segment.UpdateResult), args.Error(1) +} + +type SegmentStorageMock struct { + mock.Mock +} + +func (*SegmentStorageMock) SetChangeNumber(segmentName string, till int64) error { + panic("unimplemented") +} +func (s *SegmentStorageMock) Update(name string, toAdd *set.ThreadUnsafeSet, toRemove *set.ThreadUnsafeSet, changeNumber int64) error { + return s.Called(name, toAdd, toRemove, changeNumber).Error(0) +} + +// ChangeNumber implements storage.SegmentStorage +func (s *SegmentStorageMock) ChangeNumber(segmentName string) (int64, error) { + args := s.Called(segmentName) + return args.Get(0).(int64), args.Error(1) +} + +func (*SegmentStorageMock) Keys(segmentName string) *set.ThreadUnsafeSet { panic("unimplemented") } +func (*SegmentStorageMock) SegmentContainsKey(segmentName string, key string) (bool, error) { + panic("unimplemented") +} +func (*SegmentStorageMock) SegmentKeysCount() int64 { panic("unimplemented") } + +// --- + +type LargeSegmentStorageMock struct { + mock.Mock +} + +func (s *LargeSegmentStorageMock) SetChangeNumber(name string, till int64) { + s.Called(name, till).Error(0) +} + +func (s *LargeSegmentStorageMock) Update(name string, userKeys []string, till int64) { + s.Called(name, userKeys, till) +} +func (s *LargeSegmentStorageMock) ChangeNumber(name string) int64 { + args := s.Called(name) + return args.Get(0).(int64) +} +func (s *LargeSegmentStorageMock) Count() int { + args := s.Called() + return args.Get(0).(int) +} +func (s *LargeSegmentStorageMock) LargeSegmentsForUser(userKey string) []string { + return []string{} +} +func (s *LargeSegmentStorageMock) ContainsKey(name string, key string) (bool, error) { + args := s.Called(name, key) + return args.Get(0).(bool), args.Error(1) +} + +// --- + +type LargeSegmentUpdaterMock struct { + mock.Mock +} + +func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegment(name string, till *int64) (*int64, error) { + args := u.Called(name, till) + return args.Get(0).(*int64), args.Error(1) +} +func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegments() (map[string]*int64, error) { + args := u.Called() + return args.Get(0).(map[string]*int64), args.Error(1) +} +func (u *LargeSegmentUpdaterMock) IsCached(name string) bool { + return false +} + +// --- + +var _ gincache.CacheFlusher = (*CacheFlusherMock)(nil) +var _ split.Updater = (*SplitUpdaterMock)(nil) +var _ segment.Updater = (*SegmentUpdaterMock)(nil) +var _ largesegment.Updater = (*LargeSegmentUpdaterMock)(nil) +var _ storage.SplitStorage = (*SplitStorageMock)(nil) +var _ storage.SegmentStorage = (*SegmentStorageMock)(nil) +var _ storage.LargeSegmentsStorage = (*LargeSegmentStorageMock)(nil) diff --git a/splitio/proxy/caching/workers.go b/splitio/proxy/caching/workers.go index deb9be13..3296dc0f 100644 --- a/splitio/proxy/caching/workers.go +++ b/splitio/proxy/caching/workers.go @@ -6,6 +6,7 @@ import ( "github.com/splitio/go-split-commons/v6/healthcheck/application" "github.com/splitio/go-split-commons/v6/service" "github.com/splitio/go-split-commons/v6/storage" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment" "github.com/splitio/go-split-commons/v6/synchronizer/worker/segment" "github.com/splitio/go-split-commons/v6/synchronizer/worker/split" "github.com/splitio/go-toolkit/v5/logging" @@ -151,3 +152,65 @@ func (c *CacheAwareSegmentSynchronizer) SegmentNames() []interface{} { func (c *CacheAwareSegmentSynchronizer) IsSegmentCached(segmentName string) bool { return c.wrapped.IsSegmentCached(segmentName) } + +// CacheAwareLargeSegmentSynchronizer +type CacheAwareLargeSegmentSynchronizer struct { + wrapped largesegment.Updater + largeSegmentStorage storage.LargeSegmentsStorage + cacheFlusher gincache.CacheFlusher + splitStorage storage.SplitStorage +} + +func NewCacheAwareLargeSegmentSync( + splitStorage storage.SplitStorage, + largeSegmentStorage storage.LargeSegmentsStorage, + largeSegmentFetcher service.LargeSegmentFetcher, + logger logging.LoggerInterface, + runtimeTelemetry storage.TelemetryRuntimeProducer, + cacheFlusher gincache.CacheFlusher, + appMonitor application.MonitorProducerInterface, +) *CacheAwareLargeSegmentSynchronizer { + return &CacheAwareLargeSegmentSynchronizer{ + wrapped: largesegment.NewLargeSegmentUpdater(splitStorage, largeSegmentStorage, largeSegmentFetcher, logger, runtimeTelemetry, appMonitor), + cacheFlusher: cacheFlusher, + largeSegmentStorage: largeSegmentStorage, + splitStorage: splitStorage, + } +} + +func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegment(name string, till *int64) (*int64, error) { + previous := c.largeSegmentStorage.ChangeNumber(name) + newCN, err := c.wrapped.SynchronizeLargeSegment(name, till) + + c.shouldEvictBySurrogate(name, previous, *newCN) + + return newCN, err +} + +func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[string]*int64, error) { + previousLargeSegmentNames := c.splitStorage.LargeSegmentNames() + previousCNs := make(map[string]int64, previousLargeSegmentNames.Size()) + for _, name := range previousLargeSegmentNames.List() { + if strName, ok := name.(string); ok { + cn := c.largeSegmentStorage.ChangeNumber(strName) + previousCNs[strName] = cn + } + } + + results, err := c.wrapped.SynchronizeLargeSegments() + for name, res := range results { + c.shouldEvictBySurrogate(name, previousCNs[name], *res) + } + + return results, err +} + +func (c *CacheAwareLargeSegmentSynchronizer) shouldEvictBySurrogate(name string, previousCN int64, currentCN int64) { + if currentCN > previousCN || (previousCN > 0 && currentCN == -1) { + c.cacheFlusher.EvictBySurrogate(MakeSurrogateForLargeSegmentChanges(name)) + } +} + +func (c *CacheAwareLargeSegmentSynchronizer) IsCached(name string) bool { + return c.wrapped.IsCached(name) +} diff --git a/splitio/proxy/caching/workers_test.go b/splitio/proxy/caching/workers_test.go index ad19e847..ce9f094c 100644 --- a/splitio/proxy/caching/workers_test.go +++ b/splitio/proxy/caching/workers_test.go @@ -3,21 +3,19 @@ package caching import ( "testing" - "github.com/splitio/gincache" "github.com/splitio/go-split-commons/v6/dtos" - "github.com/splitio/go-split-commons/v6/storage" "github.com/splitio/go-split-commons/v6/synchronizer/worker/segment" "github.com/splitio/go-split-commons/v6/synchronizer/worker/split" "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/splitio/split-synchronizer/v5/splitio/proxy/caching/mocks" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) func TestCacheAwareSplitSyncNoChanges(t *testing.T) { - var splitSyncMock splitUpdaterMock + var splitSyncMock mocks.SplitUpdaterMock splitSyncMock.On("SynchronizeSplits", (*int64)(nil)).Return((*split.UpdateResult)(nil), error(nil)) - var cacheFlusherMock cacheFlusherMock - var storageMock splitStorageMock + var cacheFlusherMock mocks.CacheFlusherMock + var storageMock mocks.SplitStorageMock storageMock.On("ChangeNumber").Return(int64(-1), error(nil)) css := CacheAwareSplitSynchronizer{ @@ -36,13 +34,13 @@ func TestCacheAwareSplitSyncNoChanges(t *testing.T) { } func TestCacheAwareSplitSyncChanges(t *testing.T) { - var splitSyncMock splitUpdaterMock + var splitSyncMock mocks.SplitUpdaterMock splitSyncMock.On("SynchronizeSplits", (*int64)(nil)).Return((*split.UpdateResult)(nil), error(nil)).Times(2) - var cacheFlusherMock cacheFlusherMock + var cacheFlusherMock mocks.CacheFlusherMock cacheFlusherMock.On("EvictBySurrogate", SplitSurrogate).Times(3) - var storageMock splitStorageMock + var storageMock mocks.SplitStorageMock storageMock.On("ChangeNumber").Return(int64(-1), error(nil)).Once() storageMock.On("ChangeNumber").Return(int64(1), error(nil)).Once() @@ -75,13 +73,13 @@ func TestCacheAwareSplitSyncChangesNewMethod(t *testing.T) { // This test is used to test the new method. Eventually commons should be cleaned in order to have a single method for split-synchronization. // when that happens, either this or the previous test shold be removed - var splitSyncMock splitUpdaterMock + var splitSyncMock mocks.SplitUpdaterMock splitSyncMock.On("SynchronizeFeatureFlags", (*dtos.SplitChangeUpdate)(nil)).Return((*split.UpdateResult)(nil), error(nil)).Times(2) - var cacheFlusherMock cacheFlusherMock + var cacheFlusherMock mocks.CacheFlusherMock cacheFlusherMock.On("EvictBySurrogate", SplitSurrogate).Times(2) - var storageMock splitStorageMock + var storageMock mocks.SplitStorageMock storageMock.On("ChangeNumber").Return(int64(-1), error(nil)).Once() storageMock.On("ChangeNumber").Return(int64(1), error(nil)).Once() @@ -108,14 +106,12 @@ func TestCacheAwareSplitSyncChangesNewMethod(t *testing.T) { } func TestCacheAwareSegmentSyncNoChanges(t *testing.T) { - var segmentUpdater segmentUpdaterMock + var segmentUpdater mocks.SegmentUpdaterMock segmentUpdater.On("SynchronizeSegment", "segment1", (*int64)(nil)).Return(&segment.UpdateResult{}, nil).Once() - var splitStorage splitStorageMock - - var cacheFlusher cacheFlusherMock - - var segmentStorage segmentStorageMock + var splitStorage mocks.SplitStorageMock + var cacheFlusher mocks.CacheFlusherMock + var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment1").Return(int64(0), nil).Once() css := CacheAwareSegmentSynchronizer{ @@ -136,20 +132,20 @@ func TestCacheAwareSegmentSyncNoChanges(t *testing.T) { } func TestCacheAwareSegmentSyncSingle(t *testing.T) { - var segmentUpdater segmentUpdaterMock + var segmentUpdater mocks.SegmentUpdaterMock segmentUpdater.On("SynchronizeSegment", "segment1", (*int64)(nil)).Return(&segment.UpdateResult{ UpdatedKeys: []string{"k1"}, NewChangeNumber: 2, }, nil).Once() - var splitStorage splitStorageMock + var splitStorage mocks.SplitStorageMock - var cacheFlusher cacheFlusherMock + var cacheFlusher mocks.CacheFlusherMock cacheFlusher.On("EvictBySurrogate", MakeSurrogateForSegmentChanges("segment1")).Times(2) cacheFlusher.On("Evict", "/api/mySegments/k1").Times(2) cacheFlusher.On("Evict", "gzip::/api/mySegments/k1").Times(2) - var segmentStorage segmentStorageMock + var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment1").Return(int64(0), nil).Once() css := CacheAwareSegmentSynchronizer{ @@ -180,21 +176,21 @@ func TestCacheAwareSegmentSyncSingle(t *testing.T) { } func TestCacheAwareSegmentSyncAllSegments(t *testing.T) { - var segmentUpdater segmentUpdaterMock + var segmentUpdater mocks.SegmentUpdaterMock segmentUpdater.On("SynchronizeSegments").Return(map[string]segment.UpdateResult{"segment2": { UpdatedKeys: []string{"k1"}, NewChangeNumber: 1, }}, nil).Once() - var splitStorage splitStorageMock + var splitStorage mocks.SplitStorageMock splitStorage.On("SegmentNames").Return(set.NewSet("segment2")).Once() - var cacheFlusher cacheFlusherMock + var cacheFlusher mocks.CacheFlusherMock cacheFlusher.On("EvictBySurrogate", MakeSurrogateForSegmentChanges("segment2")).Times(1) cacheFlusher.On("Evict", "/api/mySegments/k1").Times(3) cacheFlusher.On("Evict", "gzip::/api/mySegments/k1").Times(3) - var segmentStorage segmentStorageMock + var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment2").Return(int64(0), nil).Once() css := CacheAwareSegmentSynchronizer{ @@ -238,142 +234,107 @@ func TestCacheAwareSegmentSyncAllSegments(t *testing.T) { cacheFlusher.AssertExpectations(t) } -// Borrowed mocks: These sohuld be in go-split-commons. but we need to wait until testify is adopted there +// CacheAwareLargeSegmentSynchronizer +func TestSynchronizeLargeSegment(t *testing.T) { + lsName := "largeSegment1" -type splitUpdaterMock struct { - mock.Mock -} + var splitStorage mocks.SplitStorageMock + var cacheFlusher mocks.CacheFlusherMock + cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges(lsName)).Once() -// LocalKill implements split.Updater -func (s *splitUpdaterMock) LocalKill(splitName string, defaultTreatment string, changeNumber int64) { - s.Called(splitName, defaultTreatment, changeNumber) -} + var largeSegmentStorage mocks.LargeSegmentStorageMock + largeSegmentStorage.On("ChangeNumber", lsName).Return(int64(-1)).Once() -// SynchronizeFeatureFlags implements split.Updater -func (s *splitUpdaterMock) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) (*split.UpdateResult, error) { - args := s.Called(ffChange) - return args.Get(0).(*split.UpdateResult), args.Error(1) -} + var lsUpdater mocks.LargeSegmentUpdaterMock + cnToReturn := int64(100) + lsUpdater.On("SynchronizeLargeSegment", lsName, (*int64)(nil)).Return(&cnToReturn, nil).Once() -// SynchronizeSplits implements split.Updater -func (s *splitUpdaterMock) SynchronizeSplits(till *int64) (*split.UpdateResult, error) { - args := s.Called(till) - return args.Get(0).(*split.UpdateResult), args.Error(1) -} + clsSync := CacheAwareLargeSegmentSynchronizer{ + wrapped: &lsUpdater, + cacheFlusher: &cacheFlusher, + largeSegmentStorage: &largeSegmentStorage, + splitStorage: &splitStorage, + } -// ---- + cn, err := clsSync.SynchronizeLargeSegment(lsName, nil) + if err != nil { + t.Error("Error should be nil. Actual: ", err) + } -type cacheFlusherMock struct { - mock.Mock + if *cn != 100 { + t.Error("ChangeNumber should be 100. Actual: ", *cn) + } } -func (c *cacheFlusherMock) Evict(key string) { c.Called(key) } -func (c *cacheFlusherMock) EvictAll() { c.Called() } -func (c *cacheFlusherMock) EvictBySurrogate(surrogate string) { c.Called(surrogate) } +func TestSynchronizeLargeSegmentHighestPrevious(t *testing.T) { + lsName := "largeSegment1" -// --- + var splitStorage mocks.SplitStorageMock + var cacheFlusher mocks.CacheFlusherMock + cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges(lsName)).Times(0) -type splitStorageMock struct { - mock.Mock -} + var largeSegmentStorage mocks.LargeSegmentStorageMock + largeSegmentStorage.On("ChangeNumber", lsName).Return(int64(200)).Once() -func (s *splitStorageMock) All() []dtos.SplitDTO { panic("unimplemented") } -func (s *splitStorageMock) ChangeNumber() (int64, error) { - args := s.Called() - return args.Get(0).(int64), args.Error(1) -} + var lsUpdater mocks.LargeSegmentUpdaterMock + cnToReturn := int64(100) + lsUpdater.On("SynchronizeLargeSegment", lsName, (*int64)(nil)).Return(&cnToReturn, nil).Once() -func (*splitStorageMock) FetchMany(splitNames []string) map[string]*dtos.SplitDTO { - panic("unimplemented") -} -func (*splitStorageMock) GetNamesByFlagSets(sets []string) map[string][]string { - panic("unimplemented") -} -func (*splitStorageMock) GetAllFlagSetNames() []string { - panic("unimplemented") -} -func (*splitStorageMock) KillLocally(splitName string, defaultTreatment string, changeNumber int64) { - panic("unimplemented") -} -func (s *splitStorageMock) SegmentNames() *set.ThreadUnsafeSet { - return s.Called().Get(0).(*set.ThreadUnsafeSet) -} -func (s *splitStorageMock) SetChangeNumber(changeNumber int64) error { - return s.Called(changeNumber).Error(0) -} -func (*splitStorageMock) Split(splitName string) *dtos.SplitDTO { panic("unimplemented") } -func (*splitStorageMock) SplitNames() []string { panic("unimplemented") } -func (*splitStorageMock) TrafficTypeExists(trafficType string) bool { panic("unimplemented") } -func (*splitStorageMock) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, changeNumber int64) { - panic("unimplemented") -} - -type segmentUpdaterMock struct { - mock.Mock -} - -func (s *segmentUpdaterMock) IsSegmentCached(segmentName string) bool { panic("unimplemented") } -func (s *segmentUpdaterMock) SegmentNames() []interface{} { panic("unimplemented") } + clsSync := CacheAwareLargeSegmentSynchronizer{ + wrapped: &lsUpdater, + cacheFlusher: &cacheFlusher, + largeSegmentStorage: &largeSegmentStorage, + splitStorage: &splitStorage, + } -func (s *segmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) { - args := s.Called(name, till) - return args.Get(0).(*segment.UpdateResult), args.Error(1) -} + cn, err := clsSync.SynchronizeLargeSegment(lsName, nil) + if err != nil { + t.Error("Error should be nil. Actual: ", err) + } -func (s *segmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) { - args := s.Called() - return args.Get(0).(map[string]segment.UpdateResult), args.Error(1) + if *cn != 100 { + t.Error("ChangeNumber should be 100. Actual: ", *cn) + } } -type segmentStorageMock struct { - mock.Mock -} +func TestSynchronizeLargeSegments(t *testing.T) { + var splitStorage mocks.SplitStorageMock + splitStorage.On("LargeSegmentNames").Return(set.NewSet("ls1", "ls2")) -func (*segmentStorageMock) SetChangeNumber(segmentName string, till int64) error { - panic("unimplemented") -} -func (s *segmentStorageMock) Update(name string, toAdd *set.ThreadUnsafeSet, toRemove *set.ThreadUnsafeSet, changeNumber int64) error { - return s.Called(name, toAdd, toRemove, changeNumber).Error(0) -} + var cacheFlusher mocks.CacheFlusherMock + cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges("ls1")).Once() + cacheFlusher.On("EvictBySurrogate", MakeSurrogateForLargeSegmentChanges("ls2")).Once() -// ChangeNumber implements storage.SegmentStorage -func (s *segmentStorageMock) ChangeNumber(segmentName string) (int64, error) { - args := s.Called(segmentName) - return args.Get(0).(int64), args.Error(1) -} + var cn1 int64 = 100 + var cn2 int64 = 200 + var largeSegmentStorage mocks.LargeSegmentStorageMock + largeSegmentStorage.On("ChangeNumber", "ls1").Return(cn1).Once() + largeSegmentStorage.On("ChangeNumber", "ls2").Return(cn2).Once() -func (*segmentStorageMock) Keys(segmentName string) *set.ThreadUnsafeSet { panic("unimplemented") } -func (*segmentStorageMock) SegmentContainsKey(segmentName string, key string) (bool, error) { - panic("unimplemented") -} -func (*segmentStorageMock) SegmentKeysCount() int64 { panic("unimplemented") } - -/* - type segmentUpdaterMock struct { - SynchronizeSegmentCall func(name string, till *int64) (*segment.UpdateResult, error) - SynchronizeSegmentsCall func() (map[string]segment.UpdateResult, error) - SegmentNamesCall func() []interface{} - IsSegmentCachedCall func(segmentName string) bool + var lsUpdater mocks.LargeSegmentUpdaterMock + result := map[string]*int64{ + "ls1": &cn1, + "ls2": &cn2, } + lsUpdater.On("SynchronizeLargeSegments").Return(result, nil).Once() - func (s *segmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) { - return s.SynchronizeSegmentCall(name, till) + clsSync := CacheAwareLargeSegmentSynchronizer{ + wrapped: &lsUpdater, + cacheFlusher: &cacheFlusher, + largeSegmentStorage: &largeSegmentStorage, + splitStorage: &splitStorage, } - func (s *segmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) { - return s.SynchronizeSegmentsCall() + cn, err := clsSync.SynchronizeLargeSegments() + if err != nil { + t.Error("Error should be nil. Actual: ", err) } - func (s *segmentUpdaterMock) SegmentNames() []interface{} { - return s.SegmentNamesCall() + if *cn["ls1"] != cn1 { + t.Error("ChangeNumber should be 100. Actual: ", *cn["ls1"]) } - func (s *segmentUpdaterMock) IsSegmentCached(segmentName string) bool { - return s.IsSegmentCachedCall(segmentName) + if *cn["ls2"] != cn2 { + t.Error("ChangeNumber should be 200. Actual: ", *cn["ls2"]) } -*/ -var _ split.Updater = (*splitUpdaterMock)(nil) -var _ storage.SplitStorage = (*splitStorageMock)(nil) -var _ gincache.CacheFlusher = (*cacheFlusherMock)(nil) -var _ segment.Updater = (*segmentUpdaterMock)(nil) -var _ storage.SegmentStorage = (*segmentStorageMock)(nil) +} diff --git a/splitio/proxy/storage/splits.go b/splitio/proxy/storage/splits.go index 9b3f2f8c..2d6fa6e6 100644 --- a/splitio/proxy/storage/splits.go +++ b/splitio/proxy/storage/splits.go @@ -156,6 +156,11 @@ func (p *ProxySplitStorageImpl) FetchMany(names []string) map[string]*dtos.Split // SegmentNames call is forwarded to the snapshot func (p *ProxySplitStorageImpl) SegmentNames() *set.ThreadUnsafeSet { return p.snapshot.SegmentNames() } +// LargeSegmentNames call is forwarded to the snapshot +func (p *ProxySplitStorageImpl) LargeSegmentNames() *set.ThreadUnsafeSet { + return p.snapshot.LargeSegmentNames() +} + // Split call is forwarded to the snapshot func (p *ProxySplitStorageImpl) Split(name string) *dtos.SplitDTO { return p.snapshot.Split(name) }