diff --git a/CHANGES.txt b/CHANGES.txt index aee83d31..23b8dda9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,9 @@ +6.2.1 (Oct 28, 2022) +- Updated Matchers logging: demoted the log message to "warning". + +6.2.0 (Oct 12, 2022) +- Added a new impressions mode for the SDK called NONE, to be used in factory when there is no desire to capture impressions on an SDK factory to feed Split's analytics engine. Running NONE mode, the SDK will only capture unique keys evaluated for a particular feature flag instead of full blown impressions. + 6.1.8 (Sep 9, 2022) - Updated BooleanMatcher logging: demoted the log message to "warning". @@ -37,7 +43,10 @@ - BREAKING CHANGE: Migrated to go modules (dep & bare-bones go-dep no longer supported). 5.3.0 (Oct 6, 2020) -- Added local impressions deduping (enabled by default). +- Added impressions dedupe logic to avoid sending duplicated impressions: + - Added `OPTIMIZED` and `DEBUG` modes in order to enabling/disabling how impressions are going to be sent into Split servers, + - `OPTIMIZED`: will send unique impressions in a timeframe in order to reduce how many times impressions are posted to Split. + - `DEBUG`: will send every impression generated to Split. 5.2.2 (Sep 10, 2020) - Fixed possible issue with SSE client using 100% cpu. diff --git a/README.md b/README.md index 4a694d9a..2eb53c23 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # Split GO SDK -[![Build Status](https://api.travis-ci.com/splitio/go-client.svg?branch=master)](https://api.travis-ci.com/splitio/go-client) +[![build workflow](https://github.com/splitio/go-client/actions/workflows/ci.yml/badge.svg)](https://github.com/splitio/go-client/actions) +[![PkgGoDev](https://pkg.go.dev/badge/github.com/splitio/go-client/v6)](https://pkg.go.dev/github.com/splitio/go-client/v6/splitio?tab=doc) +[![Documentation](https://img.shields.io/badge/go_client-documentation-informational)](https://help.split.io/hc/en-us/articles/360020093652-Go-SDK) ## Overview @@ -16,7 +18,7 @@ This SDK is compatible with Go 1.8. Below is a simple example that describes the instantiation and most basic usage of our SDK: -Run `go get github.com/splitio/go-client/` or `dep ensure -add github.com/splitio/go-client` +Run `go get github.com/splitio/go-client/` ```go package main diff --git a/go.mod b/go.mod index 2a5a0ce7..d98b89d0 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,6 @@ module github.com/splitio/go-client/v6 go 1.13 require ( - github.com/splitio/go-split-commons/v4 v4.1.3 - github.com/splitio/go-toolkit/v5 v5.2.1 + github.com/splitio/go-split-commons/v4 v4.2.0 + github.com/splitio/go-toolkit/v5 v5.2.2 ) diff --git a/go.sum b/go.sum index fc61266e..f84eea70 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,7 @@ +github.com/bits-and-blooms/bitset v1.3.1 h1:y+qrlmq3XsWi+xZqSaueaE8ry8Y127iMxlMfqcK8p0g= +github.com/bits-and-blooms/bitset v1.3.1/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= +github.com/bits-and-blooms/bloom/v3 v3.3.1 h1:K2+A19bXT8gJR5mU7y+1yW6hsKfNCjcP2uNfLFKncjQ= +github.com/bits-and-blooms/bloom/v3 v3.3.1/go.mod h1:bhUUknWd5khVbTe4UgMCSiOOVJzr3tMoijSK3WwvW90= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -47,12 +51,14 @@ github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAl github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/splitio/go-split-commons/v4 v4.1.3 h1:Aal1SF4GvmSP25P29oSLL6N9UMUhvFN/LURIxX2rACk= -github.com/splitio/go-split-commons/v4 v4.1.3/go.mod h1:E6yZiXwcjOdV5B7pHrBDbCsl4ju1YgYokRISZ9bYjgY= -github.com/splitio/go-toolkit/v5 v5.2.1 h1:WiAu7DD4Rl+Ly7Yz/8IDjhqAIySnlXlH6d7cG9KgoOY= -github.com/splitio/go-toolkit/v5 v5.2.1/go.mod h1:SYi/svhhtEgdMSb5tNcDcMjOSUH/7XVkvjp5dPL+nBE= +github.com/splitio/go-split-commons/v4 v4.2.0 h1:p49DFvyvsiHnVFkuNNouypzemY9TkvTNG0WBfihDNpo= +github.com/splitio/go-split-commons/v4 v4.2.0/go.mod h1:mzanM00PV8t1FL6IHc2UXepIH2z79d49ArZ2LoJHGrY= +github.com/splitio/go-toolkit/v5 v5.2.2 h1:VHSJoIH9tsRt2cCzGKN4WG3BoGCr0tCPZIl8APtJ4bw= +github.com/splitio/go-toolkit/v5 v5.2.2/go.mod h1:SYi/svhhtEgdMSb5tNcDcMjOSUH/7XVkvjp5dPL+nBE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/twmb/murmur3 v1.1.6 h1:mqrRot1BRxm+Yct+vavLMou2/iJt0tNVTTC0QoIjaZg= +github.com/twmb/murmur3 v1.1.6/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/splitio/client/client_test.go b/splitio/client/client_test.go index 12bcdc29..2b2400b6 100644 --- a/splitio/client/client_test.go +++ b/splitio/client/client_test.go @@ -9,6 +9,7 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "sync/atomic" "testing" "time" @@ -23,6 +24,7 @@ import ( "github.com/splitio/go-split-commons/v4/dtos" "github.com/splitio/go-split-commons/v4/healthcheck/application" "github.com/splitio/go-split-commons/v4/provisional" + "github.com/splitio/go-split-commons/v4/provisional/strategy" authMocks "github.com/splitio/go-split-commons/v4/service/mocks" "github.com/splitio/go-split-commons/v4/storage" "github.com/splitio/go-split-commons/v4/storage/inmemory" @@ -127,10 +129,11 @@ func getFactory() SplitFactory { cfg := conf.Default() cfg.LabelsEnabled = true logger := logging.NewLogger(nil) - impressionManager, _ := provisional.NewImpressionManager(commonsCfg.ManagerConfig{ - ImpressionsMode: cfg.ImpressionsMode, - OperationMode: cfg.OperationMode, - }, provisional.NewImpressionsCounter(), telemetryStorage) + + impressionObserver, _ := strategy.NewImpressionObserver(500) + impressionsCounter := strategy.NewImpressionsCounter() + impressionsStrategy := strategy.NewOptimizedImpl(impressionObserver, impressionsCounter, telemetryStorage, false) + impressionManager := provisional.NewImpressionManager(impressionsStrategy) return SplitFactory{ cfg: cfg, @@ -289,10 +292,11 @@ func TestClientPanicking(t *testing.T) { cfg := conf.Default() cfg.LabelsEnabled = true logger := logging.NewLogger(nil) - impressionManager, _ := provisional.NewImpressionManager(commonsCfg.ManagerConfig{ - ImpressionsMode: cfg.ImpressionsMode, - OperationMode: cfg.OperationMode, - }, provisional.NewImpressionsCounter(), telemetryMockedStorage) + + impressionObserver, _ := strategy.NewImpressionObserver(500) + impressionsCounter := strategy.NewImpressionsCounter() + impressionsStrategy := strategy.NewOptimizedImpl(impressionObserver, impressionsCounter, telemetryMockedStorage, false) + impressionManager := provisional.NewImpressionManager(impressionsStrategy) factory := SplitFactory{ cfg: cfg, @@ -448,6 +452,7 @@ func compareListener(ilTest map[string]interface{}, f string, k string, l string func getClientForListener() SplitClient { cfg := conf.Default() cfg.LabelsEnabled = true + logger := logging.NewLogger(nil) impTest := &ImpressionListenerTest{} @@ -461,11 +466,12 @@ func getClientForListener() SplitClient { RecordLatencyCall: func(method string, latency time.Duration) {}, } impressionStorage := mutexqueue.NewMQImpressionsStorage(cfg.Advanced.ImpressionsQueueSize, make(chan string, 1), logger, telemetryMockedStorage) - impressionManager, _ := provisional.NewImpressionManager(commonsCfg.ManagerConfig{ - ImpressionsMode: cfg.ImpressionsMode, - OperationMode: cfg.OperationMode, - ListenerEnabled: true, - }, provisional.NewImpressionsCounter(), telemetryMockedStorage) + + impressionObserver, _ := strategy.NewImpressionObserver(500) + impressionsCounter := strategy.NewImpressionsCounter() + impressionsStrategy := strategy.NewOptimizedImpl(impressionObserver, impressionsCounter, telemetryMockedStorage, true) + impressionManager := provisional.NewImpressionManager(impressionsStrategy) + factory := &SplitFactory{ cfg: cfg, storages: sdkStorages{ @@ -695,15 +701,14 @@ func TestBlockUntilReadyRedis(t *testing.T) { sdkConf.OperationMode = conf.RedisConsumer factory, _ := NewSplitFactory("something", sdkConf) - if !factory.IsReady() { t.Error("Factory should be ready immediately") } - client := factory.Client() if !client.factory.IsReady() { t.Error("Client should be ready immediately") } + err := client.BlockUntilReady(1) if err != nil { t.Error("Error was not expected") @@ -1135,7 +1140,7 @@ func isInvalidImpression(client SplitClient, key string, feature string, treatme func TestClient(t *testing.T) { cfg := conf.Default() - + cfg.ImpressionsMode = commonsCfg.ImpressionsModeDebug cfg.LabelsEnabled = true logger := logging.NewLogger(nil) @@ -1194,10 +1199,10 @@ func TestClient(t *testing.T) { RecordLatencyCall: func(method string, latency time.Duration) {}, } - impressionManager, _ := provisional.NewImpressionManager(commonsCfg.ManagerConfig{ - ImpressionsMode: commonsCfg.ImpressionsModeDebug, - OperationMode: cfg.OperationMode, - }, provisional.NewImpressionsCounter(), mockedTelemetryStorage) + impressionObserver, _ := strategy.NewImpressionObserver(500) + impressionsStrategy := strategy.NewDebugImpl(impressionObserver, true) + impressionManager := provisional.NewImpressionManager(impressionsStrategy) + factory := &SplitFactory{cfg: cfg, impressionManager: impressionManager} client := SplitClient{ evaluator: evaluator, @@ -1305,7 +1310,7 @@ func TestLocalhostModeYAML(t *testing.T) { expectedTreatmentAndConfig(resultTreatmentsWithConfig["other_feature"], "control", "", t) } -func getRedisConfWithIP(IPAddressesEnabled bool) *predis.PrefixedRedisClient { +func getRedisConfWithIP(IPAddressesEnabled bool) (*predis.PrefixedRedisClient, *SplitClient) { // Create prefixed client for adding Split prefixedClient, _ := redis.NewRedisClient(&commonsCfg.RedisConfig{ Host: "localhost", @@ -1317,7 +1322,7 @@ func getRedisConfWithIP(IPAddressesEnabled bool) *predis.PrefixedRedisClient { raw, err := json.Marshal(*valid) if err != nil { - return nil + return nil, nil } prefixedClient.Set("SPLITIO.split.valid", raw, 0) prefixedClient.Set("SPLITIO.splits.till", 1494593336752, 0) @@ -1346,7 +1351,7 @@ func getRedisConfWithIP(IPAddressesEnabled bool) *predis.PrefixedRedisClient { client.Treatment("user1", "valid", nil) client.Track("user1", "my-traffic", "my-event", nil, nil) - return prefixedClient + return prefixedClient, client } func deleteDataGenerated(prefixedClient *predis.PrefixedRedisClient) { @@ -1356,12 +1361,25 @@ func deleteDataGenerated(prefixedClient *predis.PrefixedRedisClient) { } func TestRedisClientWithIPDisabled(t *testing.T) { - prefixedClient := getRedisConfWithIP(false) + prefixedClient, splitClient := getRedisConfWithIP(false) + + // Grabs created event + resEvent, _ := prefixedClient.LRange("SPLITIO.events", 0, 1) + event := make(map[string]map[string]interface{}) + json.Unmarshal([]byte(resEvent[0]), &event) + metadata := event["m"] + // Checks if metadata was created with "NA" values + if metadata["i"] != "NA" || metadata["n"] != "NA" { + t.Error("Instance Name and Machine IP should have 'NA' values") + } + + splitClient.Destroy() + // Grabs created impression resImpression, _ := prefixedClient.LRange("SPLITIO.impressions", 0, 1) impression := make(map[string]map[string]interface{}) json.Unmarshal([]byte(resImpression[0]), &impression) - metadata := impression["m"] + metadata = impression["m"] // Checks if metadata was created with "NA" values if metadata["i"] != "NA" || metadata["n"] != "NA" { t.Error("Instance Name and Machine IP should have 'NA' values") @@ -1371,26 +1389,29 @@ func TestRedisClientWithIPDisabled(t *testing.T) { t.Error("InstanceName should be 'NA") } + deleteDataGenerated(prefixedClient) +} + +func TestRedisClientWithIPEnabled(t *testing.T) { + prefixedClient, splitClient := getRedisConfWithIP(true) + // Grabs created event resEvent, _ := prefixedClient.LRange("SPLITIO.events", 0, 1) event := make(map[string]map[string]interface{}) json.Unmarshal([]byte(resEvent[0]), &event) - metadata = event["m"] + metadata := event["m"] // Checks if metadata was created with "NA" values - if metadata["i"] != "NA" || metadata["n"] != "NA" { - t.Error("Instance Name and Machine IP should have 'NA' values") + if metadata["i"] == "NA" || metadata["n"] == "NA" { + t.Error("Instance Name and Machine IP should not have 'NA' values") } - deleteDataGenerated(prefixedClient) -} + splitClient.Destroy() -func TestRedisClientWithIPEnabled(t *testing.T) { - prefixedClient := getRedisConfWithIP(true) // Grabs created impression resImpression, _ := prefixedClient.LRange("SPLITIO.impressions", 0, 1) impression := make(map[string]map[string]interface{}) json.Unmarshal([]byte(resImpression[0]), &impression) - metadata := impression["m"] + metadata = impression["m"] // Checks if metadata was created with "NA" values if metadata["i"] == "NA" || metadata["n"] == "NA" { t.Error("Instance Name and Machine IP should not have 'NA' values") @@ -1400,16 +1421,6 @@ func TestRedisClientWithIPEnabled(t *testing.T) { t.Error("InstanceName should not be 'NA") } - // Grabs created event - resEvent, _ := prefixedClient.LRange("SPLITIO.events", 0, 1) - event := make(map[string]map[string]interface{}) - json.Unmarshal([]byte(resEvent[0]), &event) - metadata = event["m"] - // Checks if metadata was created with "NA" values - if metadata["i"] == "NA" || metadata["n"] == "NA" { - t.Error("Instance Name and Machine IP should not have 'NA' values") - } - deleteDataGenerated(prefixedClient) } @@ -1420,6 +1431,8 @@ func getInMemoryClientWithIP(IPAddressesEnabled bool, ts *httptest.Server) Split cfg.IPAddressesEnabled = IPAddressesEnabled cfg.Advanced.EventsURL = ts.URL cfg.Advanced.SdkURL = ts.URL + cfg.Advanced.TelemetryServiceURL = ts.URL + cfg.Advanced.AuthServiceURL = ts.URL cfg.Advanced.ImpressionListener = &ImpressionListenerTest{} cfg.TaskPeriods.ImpressionSync = 1 cfg.TaskPeriods.EventsSync = 1 @@ -1614,7 +1627,7 @@ func TestClientOptimized(t *testing.T) { } switch v["f"] { case "DEMO_MURMUR2": - if v["rc"].(float64) != 3 { + if v["rc"].(float64) != 2 { t.Error("Wrong rc") } if int64(v["m"].(float64)) != util.TruncateTimeFrame(time.Now().UTC().UnixNano()) { @@ -1643,6 +1656,8 @@ func TestClientOptimized(t *testing.T) { cfg.LabelsEnabled = true cfg.Advanced.EventsURL = ts.URL cfg.Advanced.SdkURL = ts.URL + cfg.Advanced.TelemetryServiceURL = ts.URL + cfg.Advanced.AuthServiceURL = ts.URL cfg.Advanced.ImpressionListener = impTest factory, _ := NewSplitFactory("test", cfg) @@ -1683,6 +1698,116 @@ func TestClientOptimized(t *testing.T) { } } +func TestClientNone(t *testing.T) { + var isDestroyCalled int64 + var splitsMock, _ = ioutil.ReadFile("../../testdata/splits_mock_2.json") + + postChannel := make(chan string, 1) + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/splitChanges": + fmt.Fprintln(w, string(splitsMock)) + return + case "/testImpressions/bulk": + t.Error("Should not post impressions") + case "/testImpressions/count": + fmt.Fprintln(w, "ok") + if atomic.LoadInt64(&isDestroyCalled) == 1 { + rBody, _ := ioutil.ReadAll(r.Body) + + var dataInPost map[string][]map[string]interface{} + err := json.Unmarshal(rBody, &dataInPost) + if err != nil { + t.Error(err) + return + } + + for _, v := range dataInPost["pf"] { + if int64(v["m"].(float64)) != util.TruncateTimeFrame(time.Now().UTC().UnixNano()) { + t.Error("Wrong timeFrame") + } + switch v["f"] { + case "DEMO_MURMUR2": + if v["rc"].(float64) != 4 { + t.Error("Wrong rc") + } + if int64(v["m"].(float64)) != util.TruncateTimeFrame(time.Now().UTC().UnixNano()) { + t.Error("Wrong timeFrame") + } + case "DEMO_MURMUR": + if v["rc"].(float64) != 1 { + t.Error("Wrong rc") + } + } + } + } + case "/keys/ss": + rBody, _ := ioutil.ReadAll(r.Body) + + var uniques dtos.Uniques + err := json.Unmarshal(rBody, &uniques) + if err != nil { + t.Error(err) + return + } + + if len(uniques.Keys) != 2 { + t.Error("Length should be 2") + } + for _, key := range uniques.Keys { + if key.Feature == "DEMO_MURMUR2" && len(key.Keys) != 2 { + t.Error("Length should be 2") + } + if key.Feature == "DEMO_MURMUR" && len(key.Keys) != 1 { + t.Error("Length should be 1") + } + } + + postChannel <- "finished" + case "/events/bulk": + fmt.Fprintln(w, "ok") + case "/segmentChanges": + fallthrough + default: + fmt.Fprintln(w, "ok") + } + })) + defer ts.Close() + + impTest := &ImpressionListenerTest{} + cfg := conf.Default() + cfg.LabelsEnabled = true + cfg.Advanced.StreamingEnabled = false + cfg.Advanced.EventsURL = ts.URL + cfg.Advanced.SdkURL = ts.URL + cfg.Advanced.TelemetryServiceURL = ts.URL + cfg.Advanced.ImpressionListener = impTest + cfg.ImpressionsMode = commonsCfg.ImpressionsModeNone + + factory, _ := NewSplitFactory("test", cfg) + client := factory.Client() + client.BlockUntilReady(2) + + // Calls treatments to generate one valid impression + time.Sleep(300 * time.Millisecond) // Let's wait until first call of recorders have finished + client.Treatment("user1", "DEMO_MURMUR2", nil) + client.Treatment("user1", "DEMO_MURMUR2", nil) + client.Treatments("user1", []string{"DEMO_MURMUR2", "DEMO_MURMUR"}, nil) + client.Treatment("user2", "DEMO_MURMUR2", nil) + + atomic.AddInt64(&isDestroyCalled, 1) + client.Destroy() + + select { + case <-postChannel: + return + case <-time.After(4 * time.Second): + t.Error("The test couldn't send impressions to check headers") + return + } +} + func TestClientDebug(t *testing.T) { var isDestroyCalled = false var splitsMock, _ = ioutil.ReadFile("../../testdata/splits_mock_2.json") @@ -1734,6 +1859,8 @@ func TestClientDebug(t *testing.T) { cfg.LabelsEnabled = true cfg.Advanced.EventsURL = ts.URL cfg.Advanced.SdkURL = ts.URL + cfg.Advanced.TelemetryServiceURL = ts.URL + cfg.Advanced.AuthServiceURL = ts.URL cfg.Advanced.ImpressionListener = impTest cfg.ImpressionsMode = "Debug" @@ -1987,3 +2114,245 @@ func TestTelemetryRedis(t *testing.T) { deleteDataGenerated(prefixedClient) factory.Destroy() } + +func TestClientNoneRedis(t *testing.T) { + redisConfig := &commonsCfg.RedisConfig{ + Host: "localhost", + Port: 6379, + Password: "", + Prefix: "test-prefix-m", + } + + prefixedClient, _ := redis.NewRedisClient(redisConfig, logging.NewLogger(&logging.LoggerOptions{})) + raw, _ := json.Marshal(*valid) + prefixedClient.Set("SPLITIO.split.valid", raw, 0) + raw, _ = json.Marshal(*noConfig) + prefixedClient.Set("SPLITIO.split.noConfig", raw, 0) + + impTest := &ImpressionListenerTest{} + cfg := conf.Default() + cfg.LabelsEnabled = true + cfg.Advanced.ImpressionListener = impTest + cfg.ImpressionsMode = commonsCfg.ImpressionsModeNone + cfg.OperationMode = conf.RedisConsumer + cfg.Redis = *redisConfig + + factory, _ := NewSplitFactory("test", cfg) + client := factory.Client() + client.BlockUntilReady(2) + + // Calls treatments to generate one valid impression + time.Sleep(300 * time.Millisecond) // Let's wait until first call of recorders have finished + client.Treatment("user1", "valid", nil) + client.Treatment("user2", "valid", nil) + client.Treatment("user3", "valid", nil) + client.Treatment("user1", "valid", nil) + client.Treatment("user2", "valid", nil) + client.Treatment("user3", "valid", nil) + client.Treatment("user3", "noConfig", nil) + client.Treatment("user3", "noConfig", nil) + client.Destroy() + + // Validate unique keys + uniques, _ := prefixedClient.LRange("SPLITIO.uniquekeys", 0, -1) + var uniquesDto []dtos.Key + _ = json.Unmarshal([]byte(uniques[0]), &uniquesDto) + + if len(uniquesDto) != 2 { + t.Errorf("Lenght should be 2, Actual %d", len(uniquesDto)) + } + + for _, unique := range uniquesDto { + if unique.Feature == "valid" && len(unique.Keys) != 3 { + t.Error("Keys should be 3") + } + if unique.Feature == "noConfig" && len(unique.Keys) != 1 { + t.Error("Keys should be 1") + } + } + + // Validate impression counts + impressionscount, _ := prefixedClient.HGetAll("SPLITIO.impressions.count") + + for key, count := range impressionscount { + if strings.HasPrefix(key, "valid::") && count != "6" { + t.Error("Expected: 6. actual: " + count) + } + if strings.HasPrefix(key, "noConfig::") && count != "2" { + t.Error("Expected: 2. actual: " + count) + } + } + + // Validate that impressions doesn't exist + exist, _ := prefixedClient.Exists("SPLITIO.impressions") + if exist != 0 { + t.Error("SPLITIO.impressions should not exist") + } + + // Clean redis + keys, _ := prefixedClient.Keys("SPLITIO*") + for _, k := range keys { + prefixedClient.Del(k) + } +} + +func TestClientOptimizedRedis(t *testing.T) { + redisConfig := &commonsCfg.RedisConfig{ + Host: "localhost", + Port: 6379, + Password: "", + Prefix: "test-prefix-m", + } + + prefixedClient, _ := redis.NewRedisClient(redisConfig, logging.NewLogger(&logging.LoggerOptions{})) + raw, _ := json.Marshal(*valid) + prefixedClient.Set("SPLITIO.split.valid", raw, 0) + raw, _ = json.Marshal(*noConfig) + prefixedClient.Set("SPLITIO.split.noConfig", raw, 0) + + impTest := &ImpressionListenerTest{} + cfg := conf.Default() + cfg.LabelsEnabled = true + cfg.Advanced.ImpressionListener = impTest + cfg.ImpressionsMode = commonsCfg.ImpressionsModeOptimized + cfg.OperationMode = conf.RedisConsumer + cfg.Redis = *redisConfig + + factory, _ := NewSplitFactory("test", cfg) + client := factory.Client() + client.BlockUntilReady(2) + + // Calls treatments to generate one valid impression + time.Sleep(300 * time.Millisecond) // Let's wait until first call of recorders have finished + client.Treatment("user1", "valid", nil) + client.Treatment("user2", "valid", nil) + client.Treatment("user3", "valid", nil) + client.Treatment("user1", "valid", nil) + client.Treatment("user2", "valid", nil) + client.Treatment("user3", "valid", nil) + client.Treatment("user3", "noConfig", nil) + client.Treatment("user3", "noConfig", nil) + client.Destroy() + + // Validate impressions + impressions, _ := prefixedClient.LRange("SPLITIO.impressions", 0, -1) + + if len(impressions) != 4 { + t.Error("Impression length shold be 4") + } + + for _, imp := range impressions { + var imprObject dtos.ImpressionQueueObject + _ = json.Unmarshal([]byte(imp), &imprObject) + + if imprObject.Impression.KeyName == "user1" && imprObject.Impression.FeatureName == "valid" && imprObject.Impression.Pt != 0 { + t.Error("Pt should be 0.") + } + if imprObject.Impression.KeyName == "user2" && imprObject.Impression.FeatureName == "valid" && imprObject.Impression.Pt != 0 { + t.Error("Pt should be 0.") + } + if imprObject.Impression.KeyName == "user3" && imprObject.Impression.FeatureName == "valid" && imprObject.Impression.Pt != 0 { + t.Error("Pt should be 0.") + } + if imprObject.Impression.KeyName == "user3" && imprObject.Impression.FeatureName == "noConfig" && imprObject.Impression.Pt != 0 { + t.Error("Pt should be 0.") + } + } + + // Validate impression counts + impressionscount, _ := prefixedClient.HGetAll("SPLITIO.impressions.count") + + for key, count := range impressionscount { + if strings.HasPrefix(key, "valid::") && count != "3" { + t.Error("Expected: 3. actual: " + count) + } + if strings.HasPrefix(key, "noConfig::") && count != "1" { + t.Error("Expected: 1. actual: " + count) + } + } + + // Validate that uniquekeys doesn't exist + exist, _ := prefixedClient.Exists("SPLITIO.uniquekeys") + if exist != 0 { + t.Error("SPLITIO.uniquekeys should not exist") + } + + // Clean redis + keys, _ := prefixedClient.Keys("SPLITIO*") + for _, k := range keys { + prefixedClient.Del(k) + } +} + +func TestClientDebugRedis(t *testing.T) { + redisConfig := &commonsCfg.RedisConfig{ + Host: "localhost", + Port: 6379, + Password: "", + Prefix: "test-prefix-m", + } + + prefixedClient, _ := redis.NewRedisClient(redisConfig, logging.NewLogger(&logging.LoggerOptions{})) + raw, _ := json.Marshal(*valid) + prefixedClient.Set("SPLITIO.split.valid", raw, 0) + raw, _ = json.Marshal(*noConfig) + prefixedClient.Set("SPLITIO.split.noConfig", raw, 0) + + impTest := &ImpressionListenerTest{} + cfg := conf.Default() + cfg.LabelsEnabled = true + cfg.Advanced.ImpressionListener = impTest + cfg.ImpressionsMode = commonsCfg.ImpressionsModeDebug + cfg.OperationMode = conf.RedisConsumer + cfg.Redis = *redisConfig + + factory, _ := NewSplitFactory("test", cfg) + client := factory.Client() + client.BlockUntilReady(2) + + // Calls treatments to generate one valid impression + time.Sleep(300 * time.Millisecond) // Let's wait until first call of recorders have finished + client.Treatment("user1", "valid", nil) + client.Treatment("user2", "valid", nil) + client.Treatment("user3", "valid", nil) + client.Treatment("user1", "valid", nil) + client.Treatment("user2", "valid", nil) + client.Treatment("user3", "valid", nil) + client.Treatment("user3", "noConfig", nil) + client.Treatment("user3", "noConfig", nil) + client.Destroy() + + // Validate impressions + impressions, _ := prefixedClient.LRange("SPLITIO.impressions", 0, -1) + if len(impressions) != 8 { + t.Errorf("Impression length should be 8. Actual %d", len(impressions)) + } + + // Validate impression counts + impressionscount, _ := prefixedClient.HGetAll("SPLITIO.impressions.count") + + for key, count := range impressionscount { + if strings.HasPrefix(key, "valid::") && count != "6" { + t.Error("Expected: 6. actual: " + count) + } + if strings.HasPrefix(key, "noConfig::") && count != "2" { + t.Error("Expected: 2. actual: " + count) + } + } + + // Validate that uniquekeys doesn't exist + exist, _ := prefixedClient.Exists("SPLITIO.uniquekeys") + if exist != 0 { + t.Error("SPLITIO.uniquekeys should not exist") + } + exist, _ = prefixedClient.Exists("SPLITIO.impressions.count") + if exist != 0 { + t.Error("SPLITIO.impressions.count should not exist") + } + + // Clean redis + keys, _ := prefixedClient.Keys("SPLITIO*") + for _, k := range keys { + prefixedClient.Del(k) + } +} diff --git a/splitio/client/factory.go b/splitio/client/factory.go index 350c7b93..94e6d8ae 100644 --- a/splitio/client/factory.go +++ b/splitio/client/factory.go @@ -19,9 +19,11 @@ import ( "github.com/splitio/go-split-commons/v4/dtos" "github.com/splitio/go-split-commons/v4/healthcheck/application" "github.com/splitio/go-split-commons/v4/provisional" + "github.com/splitio/go-split-commons/v4/provisional/strategy" "github.com/splitio/go-split-commons/v4/service/api" "github.com/splitio/go-split-commons/v4/service/local" "github.com/splitio/go-split-commons/v4/storage" + "github.com/splitio/go-split-commons/v4/storage/filter" "github.com/splitio/go-split-commons/v4/storage/inmemory" "github.com/splitio/go-split-commons/v4/storage/inmemory/mutexmap" "github.com/splitio/go-split-commons/v4/storage/inmemory/mutexqueue" @@ -46,14 +48,27 @@ const ( sdkInitializationFailed = -1 ) +const ( + bfExpectedElemenets = 10000000 + bfFalsePositiveProbability = 0.01 + bfCleaningPeriod = 86400 // 24 hours + uniqueKeysPeriodTaskInMemory = 900 // 15 min + uniqueKeysPeriodTaskRedis = 300 // 5 min + impressionsCountPeriodTaskInMemory = 1800 // 30 min + impressionsCountPeriodTaskRedis = 300 // 5 min + impressionsBulkSizeRedis = 100 +) + type sdkStorages struct { splits storage.SplitStorageConsumer segments storage.SegmentStorageConsumer + impressionsConsumer storage.ImpressionStorageConsumer impressions storage.ImpressionStorageProducer events storage.EventStorageProducer initTelemetry storage.TelemetryConfigProducer runtimeTelemetry storage.TelemetryRuntimeProducer evaluationTelemetry storage.TelemetryEvaluationProducer + impressionsCount storage.ImpressionsCountProducer } // SplitFactory struct is responsible for instantiating and storing instances of client and manager. @@ -136,6 +151,11 @@ func (f *SplitFactory) initializationInMemory(readyChannel chan int) { } } +func (f *SplitFactory) initializationRedis() { + go f.syncManager.Start() + f.broadcastReadiness(sdkStatusReady, make([]string, 0)) +} + // recordInitTelemetry In charge of recording init stats from redis and memory func (f *SplitFactory) recordInitTelemetry(tags []string, currentFactories map[string]int64) { f.logger.Debug("Sending init telemetry") @@ -156,12 +176,9 @@ func (f *SplitFactory) recordInitTelemetry(tags []string, currentFactories map[s AuthServiceURL: f.cfg.Advanced.AuthServiceURL, StreamingServiceURL: f.cfg.Advanced.StreamingServiceURL, }, - TaskPeriods: config.TaskPeriods(f.cfg.TaskPeriods), - ManagerConfig: config.ManagerConfig{ - OperationMode: f.cfg.OperationMode, - ImpressionsMode: f.cfg.ImpressionsMode, - ListenerEnabled: f.cfg.Advanced.ImpressionListener != nil, - }, + TaskPeriods: config.TaskPeriods(f.cfg.TaskPeriods), + ImpressionsMode: f.cfg.ImpressionsMode, + ListenerEnabled: f.cfg.Advanced.ImpressionListener != nil, }, time.Now().UTC().Sub(f.startTime).Milliseconds(), currentFactories, @@ -252,11 +269,6 @@ func (f *SplitFactory) Destroy() { if f.storages.runtimeTelemetry != nil { f.storages.runtimeTelemetry.RecordSessionLength(int64(time.Since(f.startTime) * time.Millisecond)) } - - if f.cfg.OperationMode == conf.RedisConsumer { - return - } - f.syncManager.Stop() } @@ -293,36 +305,41 @@ func setupInMemoryFactory( return nil, err } - managerConfig := config.ManagerConfig{ - ImpressionsMode: cfg.ImpressionsMode, - OperationMode: cfg.OperationMode, - ListenerEnabled: cfg.Advanced.ImpressionListener != nil, - } - var dummyHC = &application.Dummy{} splitAPI := api.NewSplitAPI(apikey, advanced, logger, metadata) workers := synchronizer.Workers{ - SplitFetcher: split.NewSplitFetcher(splitsStorage, splitAPI.SplitFetcher, logger, telemetryStorage, dummyHC), - SegmentFetcher: segment.NewSegmentFetcher(splitsStorage, segmentsStorage, splitAPI.SegmentFetcher, logger, telemetryStorage, dummyHC), - EventRecorder: event.NewEventRecorderSingle(eventsStorage, splitAPI.EventRecorder, logger, metadata, telemetryStorage), - ImpressionRecorder: impression.NewRecorderSingle(impressionsStorage, splitAPI.ImpressionRecorder, logger, metadata, managerConfig, telemetryStorage), - TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryStorage, splitAPI.TelemetryRecorder, splitsStorage, segmentsStorage, logger, metadata, telemetryStorage), + SplitFetcher: split.NewSplitFetcher(splitsStorage, splitAPI.SplitFetcher, logger, telemetryStorage, dummyHC), + SegmentFetcher: segment.NewSegmentFetcher(splitsStorage, segmentsStorage, splitAPI.SegmentFetcher, logger, telemetryStorage, dummyHC), + EventRecorder: event.NewEventRecorderSingle(eventsStorage, splitAPI.EventRecorder, logger, metadata, telemetryStorage), + TelemetryRecorder: telemetry.NewTelemetrySynchronizer(telemetryStorage, splitAPI.TelemetryRecorder, splitsStorage, segmentsStorage, logger, metadata, telemetryStorage), } splitTasks := synchronizer.SplitTasks{ - SplitSyncTask: tasks.NewFetchSplitsTask(workers.SplitFetcher, cfg.TaskPeriods.SplitSync, logger), - SegmentSyncTask: tasks.NewFetchSegmentsTask(workers.SegmentFetcher, cfg.TaskPeriods.SegmentSync, advanced.SegmentWorkers, advanced.SegmentQueueSize, logger), - EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, cfg.TaskPeriods.EventsSync, logger), - ImpressionSyncTask: tasks.NewRecordImpressionsTask(workers.ImpressionRecorder, cfg.TaskPeriods.ImpressionSync, logger, advanced.ImpressionsBulkSize), - TelemetrySyncTask: tasks.NewRecordTelemetryTask(workers.TelemetryRecorder, cfg.TaskPeriods.TelemetrySync, logger), + SplitSyncTask: tasks.NewFetchSplitsTask(workers.SplitFetcher, cfg.TaskPeriods.SplitSync, logger), + SegmentSyncTask: tasks.NewFetchSegmentsTask(workers.SegmentFetcher, cfg.TaskPeriods.SegmentSync, advanced.SegmentWorkers, advanced.SegmentQueueSize, logger), + EventSyncTask: tasks.NewRecordEventsTask(workers.EventRecorder, advanced.EventsBulkSize, cfg.TaskPeriods.EventsSync, logger), + TelemetrySyncTask: tasks.NewRecordTelemetryTask(workers.TelemetryRecorder, cfg.TaskPeriods.TelemetrySync, logger), + } + + storages := sdkStorages{ + splits: splitsStorage, + events: eventsStorage, + impressionsConsumer: impressionsStorage, + impressions: impressionsStorage, + segments: segmentsStorage, + initTelemetry: telemetryStorage, + evaluationTelemetry: telemetryStorage, + runtimeTelemetry: telemetryStorage, + } + + if cfg.ImpressionsMode == "" { + cfg.ImpressionsMode = config.ImpressionsModeOptimized } - var impressionsCounter *provisional.ImpressionsCounter - if cfg.ImpressionsMode == config.ImpressionsModeOptimized { - impressionsCounter = provisional.NewImpressionsCounter() - workers.ImpressionsCountRecorder = impressionscount.NewRecorderSingle(impressionsCounter, splitAPI.ImpressionRecorder, metadata, logger, telemetryStorage) - splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(workers.ImpressionsCountRecorder, logger) + + impressionManager, err := buildImpressionManager(cfg, advanced, logger, true, &splitTasks, &workers, storages, metadata, splitAPI, nil) + if err != nil { + return nil, err } - impressionManager, _ := provisional.NewImpressionManager(managerConfig, impressionsCounter, telemetryStorage) syncImpl := synchronizer.NewSynchronizer( advanced, @@ -352,27 +369,19 @@ func setupInMemoryFactory( } splitFactory := SplitFactory{ - startTime: time.Now().UTC(), - apikey: apikey, - cfg: cfg, - metadata: metadata, - logger: logger, - operationMode: conf.InMemoryStandAlone, - storages: sdkStorages{ - splits: splitsStorage, - events: eventsStorage, - impressions: impressionsStorage, - segments: segmentsStorage, - initTelemetry: telemetryStorage, - evaluationTelemetry: telemetryStorage, - runtimeTelemetry: telemetryStorage, - }, + startTime: time.Now().UTC(), + apikey: apikey, + cfg: cfg, + metadata: metadata, + logger: logger, + operationMode: conf.InMemoryStandAlone, + storages: storages, readinessSubscriptors: make(map[int]chan int), syncManager: syncManager, telemetrySync: workers.TelemetryRecorder, + impressionManager: impressionManager, } splitFactory.status.Store(sdkStatusInitializing) - splitFactory.impressionManager = impressionManager setFactory(splitFactory.apikey, splitFactory.logger) go splitFactory.initializationInMemory(readyChannel) @@ -388,15 +397,49 @@ func setupRedisFactory(apikey string, cfg *conf.SplitSdkConfig, logger logging.L } telemetryStorage := redis.NewTelemetryStorage(redisClient, logger, metadata) + runtimeTelemetry := mocks.MockTelemetryStorage{ + RecordSyncLatencyCall: func(resource int, latency time.Duration) {}, + RecordImpressionsStatsCall: func(dataType int, count int64) {}, + RecordSessionLengthCall: func(session int64) {}, + } + inMememoryFullQueue := make(chan string, 2) // Size 2: So that it's able to accept one event from each resource simultaneously. + impressionStorage := mutexqueue.NewMQImpressionsStorage(cfg.Advanced.ImpressionsQueueSize, inMememoryFullQueue, logger, runtimeTelemetry) storages := sdkStorages{ splits: redis.NewSplitStorage(redisClient, logger), segments: redis.NewSegmentStorage(redisClient, logger), - impressions: redis.NewImpressionStorage(redisClient, metadata, logger), + impressionsConsumer: impressionStorage, + impressions: impressionStorage, events: redis.NewEventsStorage(redisClient, metadata, logger), initTelemetry: telemetryStorage, evaluationTelemetry: telemetryStorage, + impressionsCount: redis.NewImpressionsCountStorage(redisClient, logger), + runtimeTelemetry: runtimeTelemetry, + } + + splitTasks := synchronizer.SplitTasks{} + workers := synchronizer.Workers{} + advanced := config.AdvancedConfig{} + + if cfg.ImpressionsMode == "" { + cfg.ImpressionsMode = config.ImpressionsModeDebug + } + + impressionManager, err := buildImpressionManager(cfg, advanced, logger, false, &splitTasks, &workers, storages, metadata, nil, redis.NewImpressionStorage(redisClient, metadata, logger)) + if err != nil { + return nil, err } + syncImpl := synchronizer.NewSynchronizer( + advanced, + splitTasks, + workers, + logger, + inMememoryFullQueue, + nil, + ) + + syncManager := synchronizer.NewSynchronizerManagerRedis(syncImpl, logger) + factory := &SplitFactory{ startTime: time.Now().UTC(), apikey: apikey, @@ -407,22 +450,14 @@ func setupRedisFactory(apikey string, cfg *conf.SplitSdkConfig, logger logging.L storages: storages, readinessSubscriptors: make(map[int]chan int), telemetrySync: telemetry.NewSynchronizerRedis(telemetryStorage, logger), + impressionManager: impressionManager, + syncManager: syncManager, } factory.status.Store(sdkStatusInitializing) - impressionManager, _ := provisional.NewImpressionManager(config.ManagerConfig{ - OperationMode: cfg.OperationMode, - ImpressionsMode: cfg.ImpressionsMode, - ListenerEnabled: cfg.Advanced.ImpressionListener != nil, - }, nil, mocks.MockTelemetryStorage{ - RecordSyncLatencyCall: func(resource int, latency time.Duration) {}, - RecordImpressionsStatsCall: func(dataType int, count int64) {}, - }) - if err != nil { - return nil, err - } - factory.impressionManager = impressionManager setFactory(factory.apikey, factory.logger) - factory.broadcastReadiness(sdkStatusReady, make([]string, 0)) + + factory.initializationRedis() + return factory, nil } @@ -481,15 +516,12 @@ func setupLocalhostFactory( } splitFactory.status.Store(sdkStatusInitializing) - impressionManager, _ := provisional.NewImpressionManager(config.ManagerConfig{ - OperationMode: cfg.OperationMode, - ImpressionsMode: cfg.ImpressionsMode, - ListenerEnabled: cfg.Advanced.ImpressionListener != nil, - }, nil, telemetryStorage) + impressionObserver, err := strategy.NewImpressionObserver(500) if err != nil { return nil, err } - splitFactory.impressionManager = impressionManager + impressionsStrategy := strategy.NewDebugImpl(impressionObserver, cfg.Advanced.ImpressionListener != nil) + splitFactory.impressionManager = provisional.NewImpressionManager(impressionsStrategy) setFactory(splitFactory.apikey, splitFactory.logger) // Call fetching tasks as goroutine @@ -534,3 +566,78 @@ func newFactory(apikey string, cfg conf.SplitSdkConfig, logger logging.LoggerInt return splitFactory, nil } + +func buildImpressionManager( + cfg *conf.SplitSdkConfig, + advanced config.AdvancedConfig, + logger logging.LoggerInterface, + inMemory bool, + splitTasks *synchronizer.SplitTasks, + workers *synchronizer.Workers, + storages sdkStorages, + metadata dtos.Metadata, + splitAPI *api.SplitAPI, + impressionRedisStorage storage.ImpressionStorageProducer, +) (provisional.ImpressionManager, error) { + listenerEnabled := cfg.Advanced.ImpressionListener != nil + switch cfg.ImpressionsMode { + case config.ImpressionsModeNone: + impressionsCounter := strategy.NewImpressionsCounter() + filter := filter.NewBloomFilter(bfExpectedElemenets, bfFalsePositiveProbability) + uniqueKeysTracker := strategy.NewUniqueKeysTracker(filter) + + if inMemory { + workers.ImpressionsCountRecorder = impressionscount.NewRecorderSingle(impressionsCounter, splitAPI.ImpressionRecorder, metadata, logger, storages.runtimeTelemetry) + splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(workers.ImpressionsCountRecorder, logger, impressionsCountPeriodTaskInMemory) + splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(workers.TelemetryRecorder, uniqueKeysTracker, uniqueKeysPeriodTaskInMemory, logger) + } else { + telemetryRecorder := telemetry.NewSynchronizerRedis(storages.initTelemetry, logger) + impressionsCountRecorder := impressionscount.NewRecorderRedis(impressionsCounter, storages.impressionsCount, logger) + splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(impressionsCountRecorder, logger, impressionsCountPeriodTaskRedis) + splitTasks.UniqueKeysTask = tasks.NewRecordUniqueKeysTask(telemetryRecorder, uniqueKeysTracker, uniqueKeysPeriodTaskRedis, logger) + } + + splitTasks.CleanFilterTask = tasks.NewCleanFilterTask(filter, logger, bfCleaningPeriod) + impressionsStrategy := strategy.NewNoneImpl(impressionsCounter, uniqueKeysTracker, listenerEnabled) + + return provisional.NewImpressionManager(impressionsStrategy), nil + case config.ImpressionsModeDebug: + if inMemory { + workers.ImpressionRecorder = impression.NewRecorderSingle(storages.impressionsConsumer, splitAPI.ImpressionRecorder, logger, metadata, cfg.ImpressionsMode, storages.runtimeTelemetry) + splitTasks.ImpressionSyncTask = tasks.NewRecordImpressionsTask(workers.ImpressionRecorder, cfg.TaskPeriods.ImpressionSync, logger, advanced.ImpressionsBulkSize) + } else { + workers.ImpressionRecorder = impression.NewRecorderRedis(storages.impressionsConsumer, impressionRedisStorage, logger) + splitTasks.ImpressionSyncTask = tasks.NewRecordImpressionsTask(workers.ImpressionRecorder, cfg.TaskPeriods.ImpressionSync, logger, impressionsBulkSizeRedis) + } + + impressionObserver, err := strategy.NewImpressionObserver(500) + if err != nil { + return nil, err + } + impressionsStrategy := strategy.NewDebugImpl(impressionObserver, listenerEnabled) + + return provisional.NewImpressionManager(impressionsStrategy), nil + default: + impressionsCounter := strategy.NewImpressionsCounter() + + if inMemory { + workers.ImpressionsCountRecorder = impressionscount.NewRecorderSingle(impressionsCounter, splitAPI.ImpressionRecorder, metadata, logger, storages.runtimeTelemetry) + workers.ImpressionRecorder = impression.NewRecorderSingle(storages.impressionsConsumer, splitAPI.ImpressionRecorder, logger, metadata, cfg.ImpressionsMode, storages.runtimeTelemetry) + splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(workers.ImpressionsCountRecorder, logger, impressionsCountPeriodTaskInMemory) + splitTasks.ImpressionSyncTask = tasks.NewRecordImpressionsTask(workers.ImpressionRecorder, cfg.TaskPeriods.ImpressionSync, logger, advanced.ImpressionsBulkSize) + } else { + workers.ImpressionsCountRecorder = impressionscount.NewRecorderRedis(impressionsCounter, storages.impressionsCount, logger) + workers.ImpressionRecorder = impression.NewRecorderRedis(storages.impressionsConsumer, impressionRedisStorage, logger) + splitTasks.ImpressionsCountSyncTask = tasks.NewRecordImpressionsCountTask(workers.ImpressionsCountRecorder, logger, impressionsCountPeriodTaskRedis) + splitTasks.ImpressionSyncTask = tasks.NewRecordImpressionsTask(workers.ImpressionRecorder, cfg.TaskPeriods.ImpressionSync, logger, impressionsBulkSizeRedis) + } + + impressionObserver, err := strategy.NewImpressionObserver(500) + if err != nil { + return nil, err + } + impressionsStrategy := strategy.NewOptimizedImpl(impressionObserver, impressionsCounter, storages.runtimeTelemetry, listenerEnabled) + + return provisional.NewImpressionManager(impressionsStrategy), nil + } +} diff --git a/splitio/client/input_validator_test.go b/splitio/client/input_validator_test.go index 3b5f21f9..e8952c22 100644 --- a/splitio/client/input_validator_test.go +++ b/splitio/client/input_validator_test.go @@ -14,6 +14,7 @@ import ( "github.com/splitio/go-split-commons/v4/dtos" "github.com/splitio/go-split-commons/v4/healthcheck/application" "github.com/splitio/go-split-commons/v4/provisional" + "github.com/splitio/go-split-commons/v4/provisional/strategy" "github.com/splitio/go-split-commons/v4/service/api" authMocks "github.com/splitio/go-split-commons/v4/service/mocks" "github.com/splitio/go-split-commons/v4/storage/inmemory/mutexmap" @@ -80,10 +81,12 @@ func getClient() SplitClient { RecordImpressionsStatsCall: func(dataType int, count int64) {}, RecordLatencyCall: func(method string, latency time.Duration) {}, } - impressionManager, _ := provisional.NewImpressionManager(spConf.ManagerConfig{ - ImpressionsMode: spConf.ImpressionsModeDebug, - OperationMode: cfg.OperationMode, - }, provisional.NewImpressionsCounter(), telemetryMockedStorage) + + impressionObserver, _ := strategy.NewImpressionObserver(500) + impressionsCounter := strategy.NewImpressionsCounter() + impressionsStrategy := strategy.NewOptimizedImpl(impressionObserver, impressionsCounter, telemetryMockedStorage, true) + impressionManager := provisional.NewImpressionManager(impressionsStrategy) + factory := &SplitFactory{cfg: cfg, impressionManager: impressionManager, storages: sdkStorages{ runtimeTelemetry: telemetryMockedStorage, @@ -130,7 +133,7 @@ func TestFactoryWithNilApiKey(t *testing.T) { t.Error("Should be error") } - expected := "Factory instantiation: you passed an empty apikey, apikey must be a non-empty string" + expected := "factory instantiation: you passed an empty apikey, apikey must be a non-empty string" if !mW.Matches(expected) { t.Error("Error is distinct from the expected one") } diff --git a/splitio/conf/sdkconf.go b/splitio/conf/sdkconf.go index 0a0c0005..e3a311b5 100644 --- a/splitio/conf/sdkconf.go +++ b/splitio/conf/sdkconf.go @@ -195,8 +195,10 @@ func validConfigRates(cfg *SplitSdkConfig) error { return fmt.Errorf("ImpressionSync must be >= %d. Actual is: %d", minImpressionSync, cfg.TaskPeriods.ImpressionSync) } } + case conf.ImpressionsModeNone: + return nil default: - fmt.Println(`You passed an invalid impressionsMode, impressionsMode should be one of the following values: 'debug' or 'optimized'. Defaulting to 'optimized' mode.`) + fmt.Println(`You passed an invalid impressionsMode, impressionsMode should be one of the following values: 'debug', 'optimized' or 'none'. Defaulting to 'optimized' mode.`) cfg.ImpressionsMode = conf.ImpressionsModeOptimized err := checkImpressionSync(cfg) if err != nil { @@ -211,7 +213,7 @@ func validConfigRates(cfg *SplitSdkConfig) error { return fmt.Errorf("TelemetrySync must be >= %d. Actual is: %d", minTelemetrySync, cfg.TaskPeriods.TelemetrySync) } if cfg.Advanced.SegmentWorkers <= 0 { - return errors.New("Number of workers for fetching segments MUST be greater than zero") + return errors.New("number of workers for fetching segments MUST be greater than zero") } return nil } @@ -221,7 +223,7 @@ func validConfigRates(cfg *SplitSdkConfig) error { func Normalize(apikey string, cfg *SplitSdkConfig) error { // Fail if no apikey is provided if apikey == "" && cfg.OperationMode != Localhost { - return errors.New("Factory instantiation: you passed an empty apikey, apikey must be a non-empty string") + return errors.New("factory instantiation: you passed an empty apikey, apikey must be a non-empty string") } // To keep the interface consistent with other sdks we accept "localhost" as an apikey, diff --git a/splitio/conf/sdkconf_test.go b/splitio/conf/sdkconf_test.go index ef6653d5..80509ac8 100644 --- a/splitio/conf/sdkconf_test.go +++ b/splitio/conf/sdkconf_test.go @@ -104,7 +104,7 @@ func TestValidRates(t *testing.T) { cfg = Default() cfg.Advanced.SegmentWorkers = 0 err = Normalize("asd", cfg) - if err == nil || err.Error() != "Number of workers for fetching segments MUST be greater than zero" { + if err == nil || err.Error() != "number of workers for fetching segments MUST be greater than zero" { t.Error("It should return err") } diff --git a/splitio/engine/grammar/matchers/allofset.go b/splitio/engine/grammar/matchers/allofset.go index aa456a82..8792309d 100644 --- a/splitio/engine/grammar/matchers/allofset.go +++ b/splitio/engine/grammar/matchers/allofset.go @@ -2,8 +2,9 @@ package matchers import ( "fmt" - "github.com/splitio/go-toolkit/v5/datastructures/set" "reflect" + + "github.com/splitio/go-toolkit/v5/datastructures/set" ) // ContainsAllOfSetMatcher matches if the set supplied to the getTreatment is a superset of the one in the split @@ -16,7 +17,7 @@ type ContainsAllOfSetMatcher struct { func (m *ContainsAllOfSetMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingKey, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("AllOfSetMatcher: ", err) + m.logger.Warning(fmt.Sprintf("AllOfSetMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/anyofset.go b/splitio/engine/grammar/matchers/anyofset.go index 6cde2df5..d77cdfc3 100644 --- a/splitio/engine/grammar/matchers/anyofset.go +++ b/splitio/engine/grammar/matchers/anyofset.go @@ -1,6 +1,8 @@ package matchers import ( + "fmt" + "github.com/splitio/go-toolkit/v5/datastructures/set" ) @@ -14,7 +16,7 @@ type ContainsAnyOfSetMatcher struct { func (m *ContainsAnyOfSetMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingKey, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("AnyOfSetMatcher: ", err) + m.logger.Warning(fmt.Sprintf("AnyOfSetMatcher %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/between.go b/splitio/engine/grammar/matchers/between.go index 3b8f1f78..202c9f2c 100644 --- a/splitio/engine/grammar/matchers/between.go +++ b/splitio/engine/grammar/matchers/between.go @@ -2,8 +2,9 @@ package matchers import ( "fmt" - "github.com/splitio/go-client/v6/splitio/engine/grammar/matchers/datatypes" "reflect" + + "github.com/splitio/go-client/v6/splitio/engine/grammar/matchers/datatypes" ) // BetweenMatcher will match if two numbers or two datetimes are equal @@ -18,7 +19,7 @@ type BetweenMatcher struct { func (m *BetweenMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingRaw, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("BetweenMatcher: Could not retrieve matching key. ", err) + m.logger.Warning(fmt.Sprintf("BetweenMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/contains.go b/splitio/engine/grammar/matchers/contains.go index 191662b8..35a5708e 100644 --- a/splitio/engine/grammar/matchers/contains.go +++ b/splitio/engine/grammar/matchers/contains.go @@ -1,6 +1,7 @@ package matchers import ( + "fmt" "strings" ) @@ -14,7 +15,7 @@ type ContainsStringMatcher struct { func (m *ContainsStringMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingKey, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("ContainsAllOfSetMatcher: Error retrieving matching key") + m.logger.Warning(fmt.Sprintf("ContainsAllOfSetMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/endswith.go b/splitio/engine/grammar/matchers/endswith.go index bfc6f315..662e05ed 100644 --- a/splitio/engine/grammar/matchers/endswith.go +++ b/splitio/engine/grammar/matchers/endswith.go @@ -1,6 +1,7 @@ package matchers import ( + "fmt" "strings" ) @@ -14,7 +15,7 @@ type EndsWithMatcher struct { func (m *EndsWithMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingKey, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("EndsWithMatcher: ", err) + m.logger.Warning(fmt.Sprintf("EndsWithMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/equalto.go b/splitio/engine/grammar/matchers/equalto.go index 5706634f..b4f13419 100644 --- a/splitio/engine/grammar/matchers/equalto.go +++ b/splitio/engine/grammar/matchers/equalto.go @@ -2,8 +2,9 @@ package matchers import ( "fmt" - "github.com/splitio/go-client/v6/splitio/engine/grammar/matchers/datatypes" "reflect" + + "github.com/splitio/go-client/v6/splitio/engine/grammar/matchers/datatypes" ) // EqualToMatcher will match if two numbers or two datetimes are equal @@ -18,7 +19,7 @@ func (m *EqualToMatcher) Match(key string, attributes map[string]interface{}, bu matchingRaw, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("EqualToMatcher: ", err) + m.logger.Warning(fmt.Sprintf("EqualToMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/equaltoset.go b/splitio/engine/grammar/matchers/equaltoset.go index 35959dd9..0b9ba519 100644 --- a/splitio/engine/grammar/matchers/equaltoset.go +++ b/splitio/engine/grammar/matchers/equaltoset.go @@ -1,6 +1,8 @@ package matchers import ( + "fmt" + "github.com/splitio/go-toolkit/v5/datastructures/set" ) @@ -14,7 +16,7 @@ type EqualToSetMatcher struct { func (m *EqualToSetMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingKey, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("EqualToSetMatcher: ", err) + m.logger.Warning(fmt.Sprintf("EqualToSetMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/gtoet.go b/splitio/engine/grammar/matchers/gtoet.go index bc786c18..7470194d 100644 --- a/splitio/engine/grammar/matchers/gtoet.go +++ b/splitio/engine/grammar/matchers/gtoet.go @@ -1,6 +1,8 @@ package matchers import ( + "fmt" + "github.com/splitio/go-client/v6/splitio/engine/grammar/matchers/datatypes" ) @@ -15,7 +17,7 @@ type GreaterThanOrEqualToMatcher struct { func (m *GreaterThanOrEqualToMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingRaw, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("GreaterThanOrEqualToMatcher: ", err) + m.logger.Warning(fmt.Sprintf("GreaterThanOrEqualToMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/ltoet.go b/splitio/engine/grammar/matchers/ltoet.go index e52782f0..47fa4354 100644 --- a/splitio/engine/grammar/matchers/ltoet.go +++ b/splitio/engine/grammar/matchers/ltoet.go @@ -1,6 +1,8 @@ package matchers import ( + "fmt" + "github.com/splitio/go-client/v6/splitio/engine/grammar/matchers/datatypes" ) @@ -16,7 +18,7 @@ func (m *LessThanOrEqualToMatcher) Match(key string, attributes map[string]inter matchingRaw, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("LessThanOrEqualToMatcher: ", err) + m.logger.Warning(fmt.Sprintf("LessThanOrEqualToMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/partofset.go b/splitio/engine/grammar/matchers/partofset.go index 40263726..b5c0f2bf 100644 --- a/splitio/engine/grammar/matchers/partofset.go +++ b/splitio/engine/grammar/matchers/partofset.go @@ -1,6 +1,8 @@ package matchers import ( + "fmt" + "github.com/splitio/go-toolkit/v5/datastructures/set" ) @@ -14,7 +16,7 @@ type PartOfSetMatcher struct { func (m *PartOfSetMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingKey, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("PartOfSetMatcher: ", err) + m.logger.Warning(fmt.Sprintf("PartOfSetMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/regex.go b/splitio/engine/grammar/matchers/regex.go index 3a3a68c8..dddd1a7c 100644 --- a/splitio/engine/grammar/matchers/regex.go +++ b/splitio/engine/grammar/matchers/regex.go @@ -1,6 +1,7 @@ package matchers import ( + "fmt" "reflect" "regexp" ) @@ -15,7 +16,7 @@ type RegexMatcher struct { func (m *RegexMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingKey, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("RegexMatcher: ", err) + m.logger.Warning(fmt.Sprintf("RegexMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/startswith.go b/splitio/engine/grammar/matchers/startswith.go index ba1cae1d..a1971783 100644 --- a/splitio/engine/grammar/matchers/startswith.go +++ b/splitio/engine/grammar/matchers/startswith.go @@ -1,6 +1,7 @@ package matchers import ( + "fmt" "strings" ) @@ -14,7 +15,7 @@ type StartsWithMatcher struct { func (m *StartsWithMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingKey, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("StartsWithMatcher: ", err) + m.logger.Warning(fmt.Sprintf("StartsWithMatcher: %s", err.Error())) return false } diff --git a/splitio/engine/grammar/matchers/whitelist.go b/splitio/engine/grammar/matchers/whitelist.go index 195e6429..05961bf3 100644 --- a/splitio/engine/grammar/matchers/whitelist.go +++ b/splitio/engine/grammar/matchers/whitelist.go @@ -1,6 +1,8 @@ package matchers import ( + "fmt" + "github.com/splitio/go-toolkit/v5/datastructures/set" ) @@ -14,7 +16,7 @@ type WhitelistMatcher struct { func (m *WhitelistMatcher) Match(key string, attributes map[string]interface{}, bucketingKey *string) bool { matchingKey, err := m.matchingKey(key, attributes) if err != nil { - m.logger.Error("WhitelistMatcher: ", err) + m.logger.Warning(fmt.Sprintf("WhitelistMatcher: %s", err.Error())) return false } diff --git a/splitio/version.go b/splitio/version.go index 5b4c6860..5125a4ad 100644 --- a/splitio/version.go +++ b/splitio/version.go @@ -1,4 +1,4 @@ package splitio // Version contains a string with the split sdk version -const Version = "6.1.8" +const Version = "6.2.1"