diff --git a/go.mod b/go.mod index 35f9e160..7d7e7746 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.3.0 github.com/splitio/gincache v1.0.1 - github.com/splitio/go-split-commons/v6 v6.0.1 + github.com/splitio/go-split-commons/v6 v6.0.2-0.20241126190617-03ffbb3a100f github.com/splitio/go-toolkit/v5 v5.4.0 github.com/stretchr/testify v1.9.0 go.etcd.io/bbolt v1.3.6 diff --git a/go.sum b/go.sum index 9f6e2ecd..d7333a81 100644 --- a/go.sum +++ b/go.sum @@ -93,8 +93,8 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU= github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY= -github.com/splitio/go-split-commons/v6 v6.0.1 h1:WJcvTk8lwWw6kLQvxt8hOkY/tGlBN4w+2agkINPGugY= -github.com/splitio/go-split-commons/v6 v6.0.1/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241126190617-03ffbb3a100f h1:FbR7KHn2kdtBda7CZVxut2qfFH7ZI4XW+FV6B0JpWgM= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241126190617-03ffbb3a100f/go.mod h1:D/XIY/9Hmfk9ivWsRsJVp439kEdmHbzUi3PKzQQDOXY= github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM= github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/splitio/proxy/caching/caching.go b/splitio/proxy/caching/caching.go index efae93cf..665731af 100644 --- a/splitio/proxy/caching/caching.go +++ b/splitio/proxy/caching/caching.go @@ -19,6 +19,9 @@ const ( // SplitSurrogate key (we only need one, since all splitChanges should be expired when an update is processed) SplitSurrogate = "sp" + // LargeSegmentSurrogate key (we only need one, since all memberships should be expired when an update is processed) + LargeSegmentSurrogate = "ls" + // AuthSurrogate key (having push disabled, it's safe to cache this and return it on all requests) AuthSurrogate = "au" @@ -60,7 +63,6 @@ func MakeProxyCache() *gincache.Middleware { } func keyFactoryFN(ctx *gin.Context) string { - var encodingPrefix string if strings.Contains(ctx.Request.Header.Get("Accept-Encoding"), "gzip") { encodingPrefix = "gzip::" diff --git a/splitio/proxy/caching/mocks/mock.go b/splitio/proxy/caching/mocks/mock.go new file mode 100644 index 00000000..d3ef093d --- /dev/null +++ b/splitio/proxy/caching/mocks/mock.go @@ -0,0 +1,189 @@ +package mocks + +import ( + "github.com/splitio/gincache" + "github.com/splitio/go-split-commons/v6/dtos" + "github.com/splitio/go-split-commons/v6/storage" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/segment" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/split" + "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/stretchr/testify/mock" +) + +// Borrowed mocks: These sohuld be in go-split-commons. but we need to wait until testify is adopted there + +type SplitUpdaterMock struct { + mock.Mock +} + +// LocalKill implements split.Updater +func (s *SplitUpdaterMock) LocalKill(splitName string, defaultTreatment string, changeNumber int64) { + s.Called(splitName, defaultTreatment, changeNumber) +} + +// SynchronizeFeatureFlags implements split.Updater +func (s *SplitUpdaterMock) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) (*split.UpdateResult, error) { + args := s.Called(ffChange) + return args.Get(0).(*split.UpdateResult), args.Error(1) +} + +// SynchronizeSplits implements split.Updater +func (s *SplitUpdaterMock) SynchronizeSplits(till *int64) (*split.UpdateResult, error) { + args := s.Called(till) + return args.Get(0).(*split.UpdateResult), args.Error(1) +} + +// ---- + +type CacheFlusherMock struct { + mock.Mock +} + +func (c *CacheFlusherMock) Evict(key string) { c.Called(key) } +func (c *CacheFlusherMock) EvictAll() { c.Called() } +func (c *CacheFlusherMock) EvictBySurrogate(surrogate string) { c.Called(surrogate) } + +// --- + +type SplitStorageMock struct { + mock.Mock +} + +func (s *SplitStorageMock) All() []dtos.SplitDTO { panic("unimplemented") } +func (s *SplitStorageMock) ChangeNumber() (int64, error) { + args := s.Called() + return args.Get(0).(int64), args.Error(1) +} + +func (*SplitStorageMock) FetchMany(splitNames []string) map[string]*dtos.SplitDTO { + panic("unimplemented") +} +func (*SplitStorageMock) GetNamesByFlagSets(sets []string) map[string][]string { + panic("unimplemented") +} +func (*SplitStorageMock) GetAllFlagSetNames() []string { + panic("unimplemented") +} +func (*SplitStorageMock) KillLocally(splitName string, defaultTreatment string, changeNumber int64) { + panic("unimplemented") +} +func (s *SplitStorageMock) SegmentNames() *set.ThreadUnsafeSet { + return s.Called().Get(0).(*set.ThreadUnsafeSet) +} +func (s *SplitStorageMock) LargeSegmentNames() *set.ThreadUnsafeSet { + return s.Called().Get(0).(*set.ThreadUnsafeSet) +} +func (s *SplitStorageMock) SetChangeNumber(changeNumber int64) error { + return s.Called(changeNumber).Error(0) +} +func (*SplitStorageMock) Split(splitName string) *dtos.SplitDTO { panic("unimplemented") } +func (*SplitStorageMock) SplitNames() []string { panic("unimplemented") } +func (*SplitStorageMock) TrafficTypeExists(trafficType string) bool { panic("unimplemented") } +func (*SplitStorageMock) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, changeNumber int64) { + panic("unimplemented") +} + +type SegmentUpdaterMock struct { + mock.Mock +} + +func (s *SegmentUpdaterMock) IsSegmentCached(segmentName string) bool { panic("unimplemented") } +func (s *SegmentUpdaterMock) SegmentNames() []interface{} { panic("unimplemented") } + +func (s *SegmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) { + args := s.Called(name, till) + return args.Get(0).(*segment.UpdateResult), args.Error(1) +} + +func (s *SegmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) { + args := s.Called() + return args.Get(0).(map[string]segment.UpdateResult), args.Error(1) +} + +type SegmentStorageMock struct { + mock.Mock +} + +func (*SegmentStorageMock) SetChangeNumber(segmentName string, till int64) error { + panic("unimplemented") +} +func (s *SegmentStorageMock) Update(name string, toAdd *set.ThreadUnsafeSet, toRemove *set.ThreadUnsafeSet, changeNumber int64) error { + return s.Called(name, toAdd, toRemove, changeNumber).Error(0) +} + +// ChangeNumber implements storage.SegmentStorage +func (s *SegmentStorageMock) ChangeNumber(segmentName string) (int64, error) { + args := s.Called(segmentName) + return args.Get(0).(int64), args.Error(1) +} + +func (*SegmentStorageMock) Keys(segmentName string) *set.ThreadUnsafeSet { panic("unimplemented") } +func (*SegmentStorageMock) SegmentContainsKey(segmentName string, key string) (bool, error) { + panic("unimplemented") +} +func (*SegmentStorageMock) SegmentKeysCount() int64 { panic("unimplemented") } + +// --- + +type LargeSegmentStorageMock struct { + mock.Mock +} + +func (s *LargeSegmentStorageMock) SetChangeNumber(name string, till int64) { + s.Called(name, till).Error(0) +} + +func (s *LargeSegmentStorageMock) Update(name string, userKeys []string, till int64) { + s.Called(name, userKeys, till) +} +func (s *LargeSegmentStorageMock) ChangeNumber(name string) int64 { + args := s.Called(name) + return args.Get(0).(int64) +} +func (s *LargeSegmentStorageMock) Count() int { + args := s.Called() + return args.Get(0).(int) +} +func (s *LargeSegmentStorageMock) LargeSegmentsForUser(userKey string) []string { + return []string{} +} +func (s *LargeSegmentStorageMock) IsInLargeSegment(name string, key string) (bool, error) { + args := s.Called(name, key) + return args.Get(0).(bool), args.Error(1) +} +func (s *LargeSegmentStorageMock) TotalKeys(name string) int { + return s.Called(name).Get(0).(int) +} + +// --- + +type LargeSegmentUpdaterMock struct { + mock.Mock +} + +func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegment(name string, till *int64) (*int64, error) { + args := u.Called(name, till) + return args.Get(0).(*int64), args.Error(1) +} +func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegments() (map[string]*int64, error) { + args := u.Called() + return args.Get(0).(map[string]*int64), args.Error(1) +} +func (u *LargeSegmentUpdaterMock) IsCached(name string) bool { + return u.Called().Get(0).(bool) +} +func (u *LargeSegmentUpdaterMock) SynchronizeLargeSegmentUpdate(lsRFDResponseDTO *dtos.LargeSegmentRFDResponseDTO) (*int64, error) { + args := u.Called(lsRFDResponseDTO) + return args.Get(0).(*int64), args.Error(1) +} + +// --- + +var _ gincache.CacheFlusher = (*CacheFlusherMock)(nil) +var _ split.Updater = (*SplitUpdaterMock)(nil) +var _ segment.Updater = (*SegmentUpdaterMock)(nil) +var _ largesegment.Updater = (*LargeSegmentUpdaterMock)(nil) +var _ storage.SplitStorage = (*SplitStorageMock)(nil) +var _ storage.SegmentStorage = (*SegmentStorageMock)(nil) +var _ storage.LargeSegmentsStorage = (*LargeSegmentStorageMock)(nil) diff --git a/splitio/proxy/caching/workers.go b/splitio/proxy/caching/workers.go index deb9be13..52dc03ab 100644 --- a/splitio/proxy/caching/workers.go +++ b/splitio/proxy/caching/workers.go @@ -6,6 +6,7 @@ import ( "github.com/splitio/go-split-commons/v6/healthcheck/application" "github.com/splitio/go-split-commons/v6/service" "github.com/splitio/go-split-commons/v6/storage" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment" "github.com/splitio/go-split-commons/v6/synchronizer/worker/segment" "github.com/splitio/go-split-commons/v6/synchronizer/worker/split" "github.com/splitio/go-toolkit/v5/logging" @@ -98,6 +99,7 @@ func (c *CacheAwareSegmentSynchronizer) SynchronizeSegment(name string, till *in result, err := c.wrapped.SynchronizeSegment(name, till) if current := result.NewChangeNumber; current > previous || (previous != -1 && current == -1) { c.cacheFlusher.EvictBySurrogate(MakeSurrogateForSegmentChanges(name)) + c.cacheFlusher.EvictBySurrogate(LargeSegmentSurrogate) } // remove individual entries for each affected key @@ -129,6 +131,7 @@ func (c *CacheAwareSegmentSynchronizer) SynchronizeSegments() (map[string]segmen if pcn, _ := previousCNs[segmentName]; ccn > pcn || (pcn > 0 && ccn == -1) { // if the segment was updated or the segment was removed, evict it c.cacheFlusher.EvictBySurrogate(MakeSurrogateForSegmentChanges(segmentName)) + c.cacheFlusher.EvictBySurrogate(LargeSegmentSurrogate) } for idx := range result.UpdatedKeys { @@ -151,3 +154,74 @@ func (c *CacheAwareSegmentSynchronizer) SegmentNames() []interface{} { func (c *CacheAwareSegmentSynchronizer) IsSegmentCached(segmentName string) bool { return c.wrapped.IsSegmentCached(segmentName) } + +// CacheAwareLargeSegmentSynchronizer +type CacheAwareLargeSegmentSynchronizer struct { + wrapped largesegment.Updater + largeSegmentStorage storage.LargeSegmentsStorage + cacheFlusher gincache.CacheFlusher + splitStorage storage.SplitStorage +} + +func NewCacheAwareLargeSegmentSync( + splitStorage storage.SplitStorage, + largeSegmentStorage storage.LargeSegmentsStorage, + largeSegmentFetcher service.LargeSegmentFetcher, + logger logging.LoggerInterface, + runtimeTelemetry storage.TelemetryRuntimeProducer, + cacheFlusher gincache.CacheFlusher, + appMonitor application.MonitorProducerInterface, +) *CacheAwareLargeSegmentSynchronizer { + return &CacheAwareLargeSegmentSynchronizer{ + wrapped: largesegment.NewLargeSegmentUpdater(splitStorage, largeSegmentStorage, largeSegmentFetcher, logger, runtimeTelemetry, appMonitor), + cacheFlusher: cacheFlusher, + largeSegmentStorage: largeSegmentStorage, + splitStorage: splitStorage, + } +} + +func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegment(name string, till *int64) (*int64, error) { + previous := c.largeSegmentStorage.ChangeNumber(name) + newCN, err := c.wrapped.SynchronizeLargeSegment(name, till) + + c.evictByLargeSegmentSurrogate(previous, *newCN) + + return newCN, err +} + +func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegments() (map[string]*int64, error) { + previousLargeSegmentNames := c.splitStorage.LargeSegmentNames() + previousCNs := make(map[string]int64, previousLargeSegmentNames.Size()) + for _, name := range previousLargeSegmentNames.List() { + if strName, ok := name.(string); ok { + cn := c.largeSegmentStorage.ChangeNumber(strName) + previousCNs[strName] = cn + } + } + + results, err := c.wrapped.SynchronizeLargeSegments() + for name, currentCN := range results { + c.evictByLargeSegmentSurrogate(previousCNs[name], *currentCN) + } + + return results, err +} + +func (c *CacheAwareLargeSegmentSynchronizer) IsCached(name string) bool { + return c.wrapped.IsCached(name) +} + +func (c *CacheAwareLargeSegmentSynchronizer) SynchronizeLargeSegmentUpdate(lsRFDResponseDTO *dtos.LargeSegmentRFDResponseDTO) (*int64, error) { + previous := c.largeSegmentStorage.ChangeNumber(lsRFDResponseDTO.Name) + newCN, err := c.wrapped.SynchronizeLargeSegmentUpdate(lsRFDResponseDTO) + + c.evictByLargeSegmentSurrogate(previous, *newCN) + + return newCN, err +} + +func (c *CacheAwareLargeSegmentSynchronizer) evictByLargeSegmentSurrogate(previousCN int64, currentCN int64) { + if currentCN > previousCN || currentCN == -1 { + c.cacheFlusher.EvictBySurrogate(LargeSegmentSurrogate) + } +} diff --git a/splitio/proxy/caching/workers_test.go b/splitio/proxy/caching/workers_test.go index ad19e847..4eaf70fe 100644 --- a/splitio/proxy/caching/workers_test.go +++ b/splitio/proxy/caching/workers_test.go @@ -3,21 +3,19 @@ package caching import ( "testing" - "github.com/splitio/gincache" "github.com/splitio/go-split-commons/v6/dtos" - "github.com/splitio/go-split-commons/v6/storage" "github.com/splitio/go-split-commons/v6/synchronizer/worker/segment" "github.com/splitio/go-split-commons/v6/synchronizer/worker/split" "github.com/splitio/go-toolkit/v5/datastructures/set" + "github.com/splitio/split-synchronizer/v5/splitio/proxy/caching/mocks" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" ) func TestCacheAwareSplitSyncNoChanges(t *testing.T) { - var splitSyncMock splitUpdaterMock + var splitSyncMock mocks.SplitUpdaterMock splitSyncMock.On("SynchronizeSplits", (*int64)(nil)).Return((*split.UpdateResult)(nil), error(nil)) - var cacheFlusherMock cacheFlusherMock - var storageMock splitStorageMock + var cacheFlusherMock mocks.CacheFlusherMock + var storageMock mocks.SplitStorageMock storageMock.On("ChangeNumber").Return(int64(-1), error(nil)) css := CacheAwareSplitSynchronizer{ @@ -36,13 +34,13 @@ func TestCacheAwareSplitSyncNoChanges(t *testing.T) { } func TestCacheAwareSplitSyncChanges(t *testing.T) { - var splitSyncMock splitUpdaterMock + var splitSyncMock mocks.SplitUpdaterMock splitSyncMock.On("SynchronizeSplits", (*int64)(nil)).Return((*split.UpdateResult)(nil), error(nil)).Times(2) - var cacheFlusherMock cacheFlusherMock + var cacheFlusherMock mocks.CacheFlusherMock cacheFlusherMock.On("EvictBySurrogate", SplitSurrogate).Times(3) - var storageMock splitStorageMock + var storageMock mocks.SplitStorageMock storageMock.On("ChangeNumber").Return(int64(-1), error(nil)).Once() storageMock.On("ChangeNumber").Return(int64(1), error(nil)).Once() @@ -75,13 +73,13 @@ func TestCacheAwareSplitSyncChangesNewMethod(t *testing.T) { // This test is used to test the new method. Eventually commons should be cleaned in order to have a single method for split-synchronization. // when that happens, either this or the previous test shold be removed - var splitSyncMock splitUpdaterMock + var splitSyncMock mocks.SplitUpdaterMock splitSyncMock.On("SynchronizeFeatureFlags", (*dtos.SplitChangeUpdate)(nil)).Return((*split.UpdateResult)(nil), error(nil)).Times(2) - var cacheFlusherMock cacheFlusherMock + var cacheFlusherMock mocks.CacheFlusherMock cacheFlusherMock.On("EvictBySurrogate", SplitSurrogate).Times(2) - var storageMock splitStorageMock + var storageMock mocks.SplitStorageMock storageMock.On("ChangeNumber").Return(int64(-1), error(nil)).Once() storageMock.On("ChangeNumber").Return(int64(1), error(nil)).Once() @@ -108,14 +106,12 @@ func TestCacheAwareSplitSyncChangesNewMethod(t *testing.T) { } func TestCacheAwareSegmentSyncNoChanges(t *testing.T) { - var segmentUpdater segmentUpdaterMock + var segmentUpdater mocks.SegmentUpdaterMock segmentUpdater.On("SynchronizeSegment", "segment1", (*int64)(nil)).Return(&segment.UpdateResult{}, nil).Once() - var splitStorage splitStorageMock - - var cacheFlusher cacheFlusherMock - - var segmentStorage segmentStorageMock + var splitStorage mocks.SplitStorageMock + var cacheFlusher mocks.CacheFlusherMock + var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment1").Return(int64(0), nil).Once() css := CacheAwareSegmentSynchronizer{ @@ -136,20 +132,21 @@ func TestCacheAwareSegmentSyncNoChanges(t *testing.T) { } func TestCacheAwareSegmentSyncSingle(t *testing.T) { - var segmentUpdater segmentUpdaterMock + var segmentUpdater mocks.SegmentUpdaterMock segmentUpdater.On("SynchronizeSegment", "segment1", (*int64)(nil)).Return(&segment.UpdateResult{ UpdatedKeys: []string{"k1"}, NewChangeNumber: 2, }, nil).Once() - var splitStorage splitStorageMock + var splitStorage mocks.SplitStorageMock - var cacheFlusher cacheFlusherMock + var cacheFlusher mocks.CacheFlusherMock cacheFlusher.On("EvictBySurrogate", MakeSurrogateForSegmentChanges("segment1")).Times(2) cacheFlusher.On("Evict", "/api/mySegments/k1").Times(2) cacheFlusher.On("Evict", "gzip::/api/mySegments/k1").Times(2) + cacheFlusher.On("EvictBySurrogate", LargeSegmentSurrogate).Times(2) - var segmentStorage segmentStorageMock + var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment1").Return(int64(0), nil).Once() css := CacheAwareSegmentSynchronizer{ @@ -180,21 +177,22 @@ func TestCacheAwareSegmentSyncSingle(t *testing.T) { } func TestCacheAwareSegmentSyncAllSegments(t *testing.T) { - var segmentUpdater segmentUpdaterMock + var segmentUpdater mocks.SegmentUpdaterMock segmentUpdater.On("SynchronizeSegments").Return(map[string]segment.UpdateResult{"segment2": { UpdatedKeys: []string{"k1"}, NewChangeNumber: 1, }}, nil).Once() - var splitStorage splitStorageMock + var splitStorage mocks.SplitStorageMock splitStorage.On("SegmentNames").Return(set.NewSet("segment2")).Once() - var cacheFlusher cacheFlusherMock + var cacheFlusher mocks.CacheFlusherMock cacheFlusher.On("EvictBySurrogate", MakeSurrogateForSegmentChanges("segment2")).Times(1) cacheFlusher.On("Evict", "/api/mySegments/k1").Times(3) cacheFlusher.On("Evict", "gzip::/api/mySegments/k1").Times(3) + cacheFlusher.On("EvictBySurrogate", LargeSegmentSurrogate).Times(3) - var segmentStorage segmentStorageMock + var segmentStorage mocks.SegmentStorageMock segmentStorage.On("ChangeNumber", "segment2").Return(int64(0), nil).Once() css := CacheAwareSegmentSynchronizer{ @@ -238,142 +236,119 @@ func TestCacheAwareSegmentSyncAllSegments(t *testing.T) { cacheFlusher.AssertExpectations(t) } -// Borrowed mocks: These sohuld be in go-split-commons. but we need to wait until testify is adopted there +// CacheAwareLargeSegmentSynchronizer +func TestSynchronizeLargeSegment(t *testing.T) { + lsName := "largeSegment1" -type splitUpdaterMock struct { - mock.Mock -} + var splitStorage mocks.SplitStorageMock + var cacheFlusher mocks.CacheFlusherMock + cacheFlusher.On("EvictBySurrogate", LargeSegmentSurrogate).Once() -// LocalKill implements split.Updater -func (s *splitUpdaterMock) LocalKill(splitName string, defaultTreatment string, changeNumber int64) { - s.Called(splitName, defaultTreatment, changeNumber) -} + var largeSegmentStorage mocks.LargeSegmentStorageMock + largeSegmentStorage.On("ChangeNumber", lsName).Return(int64(-1)).Once() -// SynchronizeFeatureFlags implements split.Updater -func (s *splitUpdaterMock) SynchronizeFeatureFlags(ffChange *dtos.SplitChangeUpdate) (*split.UpdateResult, error) { - args := s.Called(ffChange) - return args.Get(0).(*split.UpdateResult), args.Error(1) -} + var lsUpdater mocks.LargeSegmentUpdaterMock + cnToReturn := int64(100) + lsUpdater.On("SynchronizeLargeSegment", lsName, (*int64)(nil)).Return(&cnToReturn, nil).Once() -// SynchronizeSplits implements split.Updater -func (s *splitUpdaterMock) SynchronizeSplits(till *int64) (*split.UpdateResult, error) { - args := s.Called(till) - return args.Get(0).(*split.UpdateResult), args.Error(1) -} + clsSync := CacheAwareLargeSegmentSynchronizer{ + wrapped: &lsUpdater, + cacheFlusher: &cacheFlusher, + largeSegmentStorage: &largeSegmentStorage, + splitStorage: &splitStorage, + } -// ---- + cn, err := clsSync.SynchronizeLargeSegment(lsName, nil) + if err != nil { + t.Error("Error should be nil. Actual: ", err) + } + + if *cn != 100 { + t.Error("ChangeNumber should be 100. Actual: ", *cn) + } -type cacheFlusherMock struct { - mock.Mock + cacheFlusher.AssertExpectations(t) + largeSegmentStorage.AssertExpectations(t) + lsUpdater.AssertExpectations(t) } -func (c *cacheFlusherMock) Evict(key string) { c.Called(key) } -func (c *cacheFlusherMock) EvictAll() { c.Called() } -func (c *cacheFlusherMock) EvictBySurrogate(surrogate string) { c.Called(surrogate) } +func TestSynchronizeLargeSegmentHighestPrevious(t *testing.T) { + lsName := "largeSegment1" -// --- + var splitStorage mocks.SplitStorageMock + var cacheFlusher mocks.CacheFlusherMock -type splitStorageMock struct { - mock.Mock -} + var largeSegmentStorage mocks.LargeSegmentStorageMock + largeSegmentStorage.On("ChangeNumber", lsName).Return(int64(200)).Once() -func (s *splitStorageMock) All() []dtos.SplitDTO { panic("unimplemented") } -func (s *splitStorageMock) ChangeNumber() (int64, error) { - args := s.Called() - return args.Get(0).(int64), args.Error(1) -} + var lsUpdater mocks.LargeSegmentUpdaterMock + cnToReturn := int64(100) + lsUpdater.On("SynchronizeLargeSegment", lsName, (*int64)(nil)).Return(&cnToReturn, nil).Once() -func (*splitStorageMock) FetchMany(splitNames []string) map[string]*dtos.SplitDTO { - panic("unimplemented") -} -func (*splitStorageMock) GetNamesByFlagSets(sets []string) map[string][]string { - panic("unimplemented") -} -func (*splitStorageMock) GetAllFlagSetNames() []string { - panic("unimplemented") -} -func (*splitStorageMock) KillLocally(splitName string, defaultTreatment string, changeNumber int64) { - panic("unimplemented") -} -func (s *splitStorageMock) SegmentNames() *set.ThreadUnsafeSet { - return s.Called().Get(0).(*set.ThreadUnsafeSet) -} -func (s *splitStorageMock) SetChangeNumber(changeNumber int64) error { - return s.Called(changeNumber).Error(0) -} -func (*splitStorageMock) Split(splitName string) *dtos.SplitDTO { panic("unimplemented") } -func (*splitStorageMock) SplitNames() []string { panic("unimplemented") } -func (*splitStorageMock) TrafficTypeExists(trafficType string) bool { panic("unimplemented") } -func (*splitStorageMock) Update(toAdd []dtos.SplitDTO, toRemove []dtos.SplitDTO, changeNumber int64) { - panic("unimplemented") -} - -type segmentUpdaterMock struct { - mock.Mock -} + clsSync := CacheAwareLargeSegmentSynchronizer{ + wrapped: &lsUpdater, + cacheFlusher: &cacheFlusher, + largeSegmentStorage: &largeSegmentStorage, + splitStorage: &splitStorage, + } -func (s *segmentUpdaterMock) IsSegmentCached(segmentName string) bool { panic("unimplemented") } -func (s *segmentUpdaterMock) SegmentNames() []interface{} { panic("unimplemented") } + cn, err := clsSync.SynchronizeLargeSegment(lsName, nil) + if err != nil { + t.Error("Error should be nil. Actual: ", err) + } -func (s *segmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) { - args := s.Called(name, till) - return args.Get(0).(*segment.UpdateResult), args.Error(1) -} + if *cn != 100 { + t.Error("ChangeNumber should be 100. Actual: ", *cn) + } -func (s *segmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) { - args := s.Called() - return args.Get(0).(map[string]segment.UpdateResult), args.Error(1) + splitStorage.AssertExpectations(t) + cacheFlusher.AssertExpectations(t) + largeSegmentStorage.AssertExpectations(t) + lsUpdater.AssertExpectations(t) } -type segmentStorageMock struct { - mock.Mock -} +func TestSynchronizeLargeSegments(t *testing.T) { + var splitStorage mocks.SplitStorageMock + splitStorage.On("LargeSegmentNames").Return(set.NewSet("ls1", "ls2")) -func (*segmentStorageMock) SetChangeNumber(segmentName string, till int64) error { - panic("unimplemented") -} -func (s *segmentStorageMock) Update(name string, toAdd *set.ThreadUnsafeSet, toRemove *set.ThreadUnsafeSet, changeNumber int64) error { - return s.Called(name, toAdd, toRemove, changeNumber).Error(0) -} + var cacheFlusher mocks.CacheFlusherMock + cacheFlusher.On("EvictBySurrogate", LargeSegmentSurrogate).Times(2) -// ChangeNumber implements storage.SegmentStorage -func (s *segmentStorageMock) ChangeNumber(segmentName string) (int64, error) { - args := s.Called(segmentName) - return args.Get(0).(int64), args.Error(1) -} + var cn1 int64 = 100 + var cn2 int64 = 200 + var largeSegmentStorage mocks.LargeSegmentStorageMock + largeSegmentStorage.On("ChangeNumber", "ls1").Return(cn1 - 50).Once() + largeSegmentStorage.On("ChangeNumber", "ls2").Return(cn2 - 50).Once() -func (*segmentStorageMock) Keys(segmentName string) *set.ThreadUnsafeSet { panic("unimplemented") } -func (*segmentStorageMock) SegmentContainsKey(segmentName string, key string) (bool, error) { - panic("unimplemented") -} -func (*segmentStorageMock) SegmentKeysCount() int64 { panic("unimplemented") } - -/* - type segmentUpdaterMock struct { - SynchronizeSegmentCall func(name string, till *int64) (*segment.UpdateResult, error) - SynchronizeSegmentsCall func() (map[string]segment.UpdateResult, error) - SegmentNamesCall func() []interface{} - IsSegmentCachedCall func(segmentName string) bool + var lsUpdater mocks.LargeSegmentUpdaterMock + result := map[string]*int64{ + "ls1": &cn1, + "ls2": &cn2, } + lsUpdater.On("SynchronizeLargeSegments").Return(result, nil).Once() - func (s *segmentUpdaterMock) SynchronizeSegment(name string, till *int64) (*segment.UpdateResult, error) { - return s.SynchronizeSegmentCall(name, till) + clsSync := CacheAwareLargeSegmentSynchronizer{ + wrapped: &lsUpdater, + cacheFlusher: &cacheFlusher, + largeSegmentStorage: &largeSegmentStorage, + splitStorage: &splitStorage, } - func (s *segmentUpdaterMock) SynchronizeSegments() (map[string]segment.UpdateResult, error) { - return s.SynchronizeSegmentsCall() + cn, err := clsSync.SynchronizeLargeSegments() + if err != nil { + t.Error("Error should be nil. Actual: ", err) } - func (s *segmentUpdaterMock) SegmentNames() []interface{} { - return s.SegmentNamesCall() + if *cn["ls1"] != cn1 { + t.Error("ChangeNumber should be 100. Actual: ", *cn["ls1"]) } - func (s *segmentUpdaterMock) IsSegmentCached(segmentName string) bool { - return s.IsSegmentCachedCall(segmentName) + if *cn["ls2"] != cn2 { + t.Error("ChangeNumber should be 200. Actual: ", *cn["ls2"]) } -*/ -var _ split.Updater = (*splitUpdaterMock)(nil) -var _ storage.SplitStorage = (*splitStorageMock)(nil) -var _ gincache.CacheFlusher = (*cacheFlusherMock)(nil) -var _ segment.Updater = (*segmentUpdaterMock)(nil) -var _ storage.SegmentStorage = (*segmentStorageMock)(nil) + + splitStorage.AssertExpectations(t) + cacheFlusher.AssertExpectations(t) + largeSegmentStorage.AssertExpectations(t) + lsUpdater.AssertExpectations(t) +} diff --git a/splitio/proxy/storage/splits.go b/splitio/proxy/storage/splits.go index 9b3f2f8c..2d6fa6e6 100644 --- a/splitio/proxy/storage/splits.go +++ b/splitio/proxy/storage/splits.go @@ -156,6 +156,11 @@ func (p *ProxySplitStorageImpl) FetchMany(names []string) map[string]*dtos.Split // SegmentNames call is forwarded to the snapshot func (p *ProxySplitStorageImpl) SegmentNames() *set.ThreadUnsafeSet { return p.snapshot.SegmentNames() } +// LargeSegmentNames call is forwarded to the snapshot +func (p *ProxySplitStorageImpl) LargeSegmentNames() *set.ThreadUnsafeSet { + return p.snapshot.LargeSegmentNames() +} + // Split call is forwarded to the snapshot func (p *ProxySplitStorageImpl) Split(name string) *dtos.SplitDTO { return p.snapshot.Split(name) }