From a0d8bc32a12d4dda9b533f26f4a7142effe3bdfb Mon Sep 17 00:00:00 2001 From: Mauro Antonio Sanz Date: Fri, 15 Nov 2024 18:13:46 -0300 Subject: [PATCH] connecting proxy init with commons --- go.mod | 2 +- go.sum | 6 ++++-- splitio/producer/conf/sections.go | 2 +- splitio/proxy/caching/workers_test.go | 3 +++ splitio/proxy/conf/sections.go | 7 ++++--- splitio/proxy/initialization.go | 6 ++++++ splitio/proxy/storage/splits.go | 5 +++++ 7 files changed, 24 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 35f9e160..32c24eab 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/gin-gonic/gin v1.10.0 github.com/google/uuid v1.3.0 github.com/splitio/gincache v1.0.1 - github.com/splitio/go-split-commons/v6 v6.0.1 + github.com/splitio/go-split-commons/v6 v6.0.2-0.20241114213309-4748736195ee github.com/splitio/go-toolkit/v5 v5.4.0 github.com/stretchr/testify v1.9.0 go.etcd.io/bbolt v1.3.6 diff --git a/go.sum b/go.sum index 9f6e2ecd..8a015d76 100644 --- a/go.sum +++ b/go.sum @@ -93,8 +93,10 @@ github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUA github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= github.com/splitio/gincache v1.0.1 h1:dLYdANY/BqH4KcUMCe/LluLyV5WtuE/LEdQWRE06IXU= github.com/splitio/gincache v1.0.1/go.mod h1:CcgJDSM9Af75kyBH0724v55URVwMBuSj5x1eCWIOECY= -github.com/splitio/go-split-commons/v6 v6.0.1 h1:WJcvTk8lwWw6kLQvxt8hOkY/tGlBN4w+2agkINPGugY= -github.com/splitio/go-split-commons/v6 v6.0.1/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241113185534-b1abedb428ec h1:X3+xCQQlfUWwn20FEuIj86Ut4gbok9jUJfsF0G7ba8s= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241113185534-b1abedb428ec/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241114213309-4748736195ee h1:OzBBbeZn7PZ18oV+PBrYn6tmDx2xC1/DojSmS/PV24s= +github.com/splitio/go-split-commons/v6 v6.0.2-0.20241114213309-4748736195ee/go.mod h1:TsvIh3XP7yjc7ly4vpj06AkoBND36SodPs5qfhb8rHc= github.com/splitio/go-toolkit/v5 v5.4.0 h1:g5WFpRhQomnXCmvfsNOWV4s5AuUrWIZ+amM68G8NBKM= github.com/splitio/go-toolkit/v5 v5.4.0/go.mod h1:xYhUvV1gga9/1029Wbp5pjnR6Cy8nvBpjw99wAbsMko= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/splitio/producer/conf/sections.go b/splitio/producer/conf/sections.go index 9c98850c..454c5033 100644 --- a/splitio/producer/conf/sections.go +++ b/splitio/producer/conf/sections.go @@ -17,7 +17,7 @@ type Main struct { Integrations conf.Integrations `json:"integrations" s-nested:"true"` Logging conf.Logging `json:"logging" s-nested:"true"` Healthcheck Healthcheck `json:"healthcheck" s-nested:"true"` - FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.1" s-desc:"Spec version for flags"` + FlagSpecVersion string `json:"flagSpecVersion" s-cli:"flag-spec-version" s-def:"1.2" s-desc:"Spec version for flags"` } // BuildAdvancedConfig generates a commons-compatible advancedconfig with default + overriden parameters diff --git a/splitio/proxy/caching/workers_test.go b/splitio/proxy/caching/workers_test.go index ad19e847..369638c2 100644 --- a/splitio/proxy/caching/workers_test.go +++ b/splitio/proxy/caching/workers_test.go @@ -298,6 +298,9 @@ func (*splitStorageMock) KillLocally(splitName string, defaultTreatment string, func (s *splitStorageMock) SegmentNames() *set.ThreadUnsafeSet { return s.Called().Get(0).(*set.ThreadUnsafeSet) } +func (s *splitStorageMock) LargeSegmentNames() *set.ThreadUnsafeSet { + return s.Called().Get(0).(*set.ThreadUnsafeSet) +} func (s *splitStorageMock) SetChangeNumber(changeNumber int64) error { return s.Called(changeNumber).Error(0) } diff --git a/splitio/proxy/conf/sections.go b/splitio/proxy/conf/sections.go index 4b3df854..25671182 100644 --- a/splitio/proxy/conf/sections.go +++ b/splitio/proxy/conf/sections.go @@ -68,9 +68,10 @@ type Persistent struct { // Sync configuration options type Sync struct { - SplitRefreshRateMs int64 `json:"splitRefreshRateMs" s-cli:"split-refresh-rate-ms" s-def:"60000" s-desc:"How often to refresh feature flags"` - SegmentRefreshRateMs int64 `json:"segmentRefreshRateMs" s-cli:"segment-refresh-rate-ms" s-def:"60000" s-desc:"How often to refresh segments"` - Advanced AdvancedSync `json:"advanced" s-nested:"true"` + SplitRefreshRateMs int64 `json:"splitRefreshRateMs" s-cli:"split-refresh-rate-ms" s-def:"60000" s-desc:"How often to refresh feature flags"` + SegmentRefreshRateMs int64 `json:"segmentRefreshRateMs" s-cli:"segment-refresh-rate-ms" s-def:"60000" s-desc:"How often to refresh segments"` + Advanced AdvancedSync `json:"advanced" s-nested:"true"` + LargeSegmentRefreshRateMs int64 `json:"largeSegmentRefreshRateMs" s-cli:"large-segment-refresh-rate-ms" s-def:"3600000" s-desc:"How often to refresh large segments"` } // AdvancedSync configuration options diff --git a/splitio/proxy/initialization.go b/splitio/proxy/initialization.go index 30a3ca6f..b0b572d2 100644 --- a/splitio/proxy/initialization.go +++ b/splitio/proxy/initialization.go @@ -12,7 +12,9 @@ import ( "github.com/splitio/go-split-commons/v6/conf" "github.com/splitio/go-split-commons/v6/flagsets" "github.com/splitio/go-split-commons/v6/service/api" + "github.com/splitio/go-split-commons/v6/storage/inmemory/mutexmap" "github.com/splitio/go-split-commons/v6/synchronizer" + "github.com/splitio/go-split-commons/v6/synchronizer/worker/largesegment" "github.com/splitio/go-split-commons/v6/tasks" "github.com/splitio/go-split-commons/v6/telemetry" "github.com/splitio/go-toolkit/v5/backoff" @@ -85,6 +87,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { // Proxy storages already implement the observable interface, so no need to wrap them splitStorage := storage.NewProxySplitStorage(dbInstance, logger, flagsets.NewFlagSetFilter(cfg.FlagSetsFilter), cfg.Initialization.Snapshot != "") segmentStorage := storage.NewProxySegmentStorage(dbInstance, logger, cfg.Initialization.Snapshot != "") + largeSegmentStorage := mutexmap.NewLargeSegmentsStorage() // Local telemetry tbufferSize := int(cfg.Sync.Advanced.TelemetryBuffer) @@ -124,6 +127,7 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { appMonitor), TelemetryRecorder: telemetry.NewTelemetrySynchronizer(localTelemetryStorage, telemetryRecorder, splitStorage, segmentStorage, logger, metadata, localTelemetryStorage), + LargeSegmentUpdater: largesegment.NewLargeSegmentUpdater(splitStorage, largeSegmentStorage, splitAPI.LargeSegmentFetcher, logger, localTelemetryStorage, appMonitor), } // setup periodic tasks in case streaming is disabled or we need to fall back to polling @@ -135,6 +139,8 @@ func Start(logger logging.LoggerInterface, cfg *pconf.Main) error { ImpressionSyncTask: impressionTask, ImpressionsCountSyncTask: impressionCountTask, EventSyncTask: eventsTask, + LargeSegmentSyncTask: tasks.NewFetchLargeSegmentsTask(workers.LargeSegmentUpdater, splitStorage, int(cfg.Sync.LargeSegmentRefreshRateMs/1000), + advanced.LargeSegmentWorkers, advanced.LargeSegmentQueueSize, logger), } // Creating Synchronizer for tasks diff --git a/splitio/proxy/storage/splits.go b/splitio/proxy/storage/splits.go index 9b3f2f8c..63386862 100644 --- a/splitio/proxy/storage/splits.go +++ b/splitio/proxy/storage/splits.go @@ -153,6 +153,11 @@ func (p *ProxySplitStorageImpl) FetchMany(names []string) map[string]*dtos.Split return p.snapshot.FetchMany(names) } +// LargeSegmentNames +func (p *ProxySplitStorageImpl) LargeSegmentNames() *set.ThreadUnsafeSet { + return p.snapshot.LargeSegmentNames() +} + // SegmentNames call is forwarded to the snapshot func (p *ProxySplitStorageImpl) SegmentNames() *set.ThreadUnsafeSet { return p.snapshot.SegmentNames() }