Skip to content

Commit

Permalink
Merge pull request #18 from splitio/task/treatmentsByFlagSets
Browse files Browse the repository at this point in the history
Added `treatmentsByFlagSets` and `treatmentsWithConfigByFlagSets` methods
  • Loading branch information
mmelograno authored Jan 9, 2024
2 parents 46914af + ae62ed5 commit 477cb0c
Show file tree
Hide file tree
Showing 9 changed files with 587 additions and 12 deletions.
2 changes: 1 addition & 1 deletion splitio/commitsha.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package splitio

const CommitSHA = "9e51b85"
const CommitSHA = "b98165e"
28 changes: 28 additions & 0 deletions splitio/link/protocol/v1/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,34 @@ func NewTreatmentsResp(ok bool, data []sdk.EvaluationResult) *v1.ResponseWrapper
}
}

func NewTreatmentsByFlagSetResp(ok bool, data map[string]sdk.EvaluationResult) *v1.ResponseWrapper[v1.TreatmentsWithFeaturePayload] {
res := v1.ResultOk
if !ok {
res = v1.ResultInternalError
}

payload := make(map[string]v1.TreatmentPayload, len(data))
for f, r := range data {
p := v1.TreatmentPayload{
Treatment: r.Treatment,
Config: r.Config,
}
if r.Impression != nil {
p.ListenerData = &v1.ListenerExtraData{
Label: r.Impression.Label,
Timestamp: r.Impression.Time,
ChangeNumber: r.Impression.ChangeNumber,
}
}
payload[f] = p
}

return &v1.ResponseWrapper[v1.TreatmentsWithFeaturePayload]{
Status: res,
Payload: v1.TreatmentsWithFeaturePayload{Results: payload},
}
}

func NewTrackResp(ok bool) *v1.ResponseWrapper[v1.TrackPayload] {
res := v1.ResultOk
if !ok {
Expand Down
88 changes: 77 additions & 11 deletions splitio/link/protocol/v1/rpcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ const (
OCRegister OpCode = 0x00

// Treatment-related ops
OCTreatment OpCode = 0x11
OCTreatments OpCode = 0x12
OCTreatmentWithConfig OpCode = 0x13
OCTreatmentsWithConfig OpCode = 0x14
OCTreatmentsByFlagSet OpCode = 0x15
OCTreatmentsWithConfigByFlagSet OpCode = 0x16
OCTreatment OpCode = 0x11
OCTreatments OpCode = 0x12
OCTreatmentWithConfig OpCode = 0x13
OCTreatmentsWithConfig OpCode = 0x14
OCTreatmentsByFlagSet OpCode = 0x15
OCTreatmentsWithConfigByFlagSet OpCode = 0x16
OCTreatmentsByFlagSets OpCode = 0x17
OCTreatmentsWithConfigByFlagSets OpCode = 0x18

// Track-related ops
OCTrack OpCode = 0x80
Expand All @@ -45,6 +47,10 @@ func (o OpCode) String() string {
return "treatments-by-flag-set"
case OCTreatmentsWithConfigByFlagSet:
return "treatments-with-config-by-flag-set"
case OCTreatmentsByFlagSets:
return "treatments-by-flag-sets"
case OCTreatmentsWithConfigByFlagSets:
return "treatments-with-config-by-flag-sets"
case OCTrack:
return "track"
case OCSplitNames:
Expand Down Expand Up @@ -221,7 +227,7 @@ func (t *TreatmentsArgs) PopulateFromRPC(rpc *RPC) error {
return RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsArgFeaturesIdx)}

}
t.Features, ok = sanitizeFeatureList(rawFeatureList)
t.Features, ok = sanitizeToStringSlice(rawFeatureList)
if !ok {
return RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsArgFeaturesIdx)}
}
Expand Down Expand Up @@ -289,6 +295,66 @@ func (t *TreatmentsByFlagSetArgs) PopulateFromRPC(rpc *RPC) error {
return nil
}

const (
TreatmentsByFlagSetsArgKeyIdx int = 0
TreatmentsByFlagSetsArgBucketingKeyIdx int = 1
TreatmentsByFlagSetsArgFlagSetsIdx int = 2
TreatmentsByFlagSetsArgAttributesIdx int = 3
)

type TreatmentsByFlagSetsArgs struct {
Key string `msgpack:"k"`
BucketingKey *string `msgpack:"b"`
FlagSets []string `msgpack:"f"`
Attributes map[string]interface{} `msgpack:"a"`
}

func (r TreatmentsByFlagSetsArgs) Encode() []interface{} {
var bk string
if r.BucketingKey != nil {
bk = *r.BucketingKey
}
return []interface{}{r.Key, bk, r.FlagSets, r.Attributes}
}

func (t *TreatmentsByFlagSetsArgs) PopulateFromRPC(rpc *RPC) error {
if rpc.OpCode != OCTreatmentsByFlagSets && rpc.OpCode != OCTreatmentsWithConfigByFlagSets {
return RPCParseError{Code: PECOpCodeMismatch}
}
if len(rpc.Args) != 4 {
return RPCParseError{Code: PECWrongArgCount}
}

var ok bool
var err error

if t.Key, ok = rpc.Args[TreatmentsByFlagSetsArgKeyIdx].(string); !ok {
return RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetsArgKeyIdx)}
}

if t.BucketingKey, err = getOptionalRef[string](rpc.Args[TreatmentsByFlagSetsArgBucketingKeyIdx]); err != nil {
return RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetsArgBucketingKeyIdx)}
}

rawFlagSetsList, ok := rpc.Args[TreatmentsByFlagSetsArgFlagSetsIdx].([]interface{})
if !ok {
return RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetsArgFlagSetsIdx)}
}

t.FlagSets, ok = sanitizeToStringSlice(rawFlagSetsList)
if !ok {
return RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetsArgFlagSetsIdx)}
}

rawAttrs, err := getOptional[map[string]interface{}](rpc.Args[TreatmentsByFlagSetsArgAttributesIdx])
if err != nil {
return RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetsArgAttributesIdx)}
}
t.Attributes = sanitizeAttributes(rawAttrs)

return nil
}

const (
TrackArgKeyIdx int = 0
TrackArgTrafficTypeIdx int = 1
Expand Down Expand Up @@ -476,16 +542,16 @@ func sanitizeAttributes(attrs map[string]interface{}) map[string]interface{} {
return attrs
}

func sanitizeFeatureList(raw []interface{}) ([]string, bool) {
features := make([]string, 0, len(raw))
func sanitizeToStringSlice(raw []interface{}) ([]string, bool) {
asStringSlice := make([]string, 0, len(raw))
for _, f := range raw {
asStr, ok := f.(string)
if !ok {
return nil, false
}
features = append(features, asStr)
asStringSlice = append(asStringSlice, asStr)
}
return features, true
return asStringSlice, true
}

func tryInt[T int8 | int16 | int32 | int64 | uint8 | uint16 | uint32 | uint64](x interface{}) (T, bool) {
Expand Down
130 changes: 130 additions & 0 deletions splitio/link/protocol/v1/rpcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func TestOpStringify(t *testing.T) {
assert.Equal(t, "treatment-with-config", OCTreatmentWithConfig.String())
assert.Equal(t, "treatments", OCTreatments.String())
assert.Equal(t, "treatments-with-config", OCTreatmentsWithConfig.String())
assert.Equal(t, "treatments-by-flag-set", OCTreatmentsByFlagSet.String())
assert.Equal(t, "treatments-with-config-by-flag-set", OCTreatmentsWithConfigByFlagSet.String())
assert.Equal(t, "treatments-by-flag-sets", OCTreatmentsByFlagSets.String())
assert.Equal(t, "treatments-with-config-by-flag-sets", OCTreatmentsWithConfigByFlagSets.String())
assert.Equal(t, "track", OCTrack.String())
assert.Equal(t, "split-names", OCSplitNames.String())
assert.Equal(t, "split", OCSplit.String())
Expand Down Expand Up @@ -177,6 +181,132 @@ func TestTreatmentsRPCParsing(t *testing.T) {
assert.Nil(t, r.Attributes)
}

func TestTreatmentsByFlagSetRPCParsing(t *testing.T) {
var r TreatmentsByFlagSetArgs
assert.Equal(t,
RPCParseError{Code: PECOpCodeMismatch},
r.PopulateFromRPC(&RPC{RPCBase: protocol.RPCBase{Version: protocol.V1}, OpCode: OCRegister, Args: nil}),
)
assert.Equal(t,
RPCParseError{Code: PECWrongArgCount},
r.PopulateFromRPC(&RPC{RPCBase: protocol.RPCBase{Version: protocol.V1}, OpCode: OCTreatmentsByFlagSet, Args: []interface{}{}}),
)
assert.Equal(t,
RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetArgKeyIdx)},
r.PopulateFromRPC(&RPC{RPCBase: protocol.RPCBase{Version: protocol.V1}, OpCode: OCTreatmentsByFlagSet, Args: []interface{}{nil, nil, nil, nil}}),
)
assert.Equal(t,
RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetArgBucketingKeyIdx)},
r.PopulateFromRPC(&RPC{RPCBase: protocol.RPCBase{Version: protocol.V1}, OpCode: OCTreatmentsByFlagSet, Args: []interface{}{"key", 123, nil, nil}}),
)
assert.Equal(t,
RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetArgFlagSetIdx)},
r.PopulateFromRPC(&RPC{RPCBase: protocol.RPCBase{Version: protocol.V1}, OpCode: OCTreatmentsByFlagSet, Args: []interface{}{"key", "bk", 123, nil}}),
)
assert.Equal(t,
RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetArgAttributesIdx)},
r.PopulateFromRPC(&RPC{
RPCBase: protocol.RPCBase{Version: protocol.V1},
OpCode: OCTreatmentsByFlagSet,
Args: []interface{}{"key", "bk", "set", 123}}),
)

err := r.PopulateFromRPC(&RPC{
RPCBase: protocol.RPCBase{Version: protocol.V1},
OpCode: OCTreatmentsByFlagSet,
Args: []interface{}{"key", "bk", "set", map[string]interface{}{"a": 1}}})
assert.Nil(t, err)
assert.Equal(t, "key", r.Key)
assert.Equal(t, lang.Ref("bk"), r.BucketingKey)
assert.Equal(t, "set", r.FlagSet)
assert.Equal(t, map[string]interface{}{"a": int64(1)}, r.Attributes)

// nil bucketing key
err = r.PopulateFromRPC(&RPC{
RPCBase: protocol.RPCBase{Version: protocol.V1},
OpCode: OCTreatmentsByFlagSet,
Args: []interface{}{"key", nil, "set", map[string]interface{}{"a": 1}}})
assert.Nil(t, err)
assert.Equal(t, "key", r.Key)
assert.Nil(t, r.BucketingKey)
assert.Equal(t, "set", r.FlagSet)
assert.Equal(t, map[string]interface{}{"a": int64(1)}, r.Attributes)

// nil attributes
err = r.PopulateFromRPC(&RPC{
RPCBase: protocol.RPCBase{Version: protocol.V1},
OpCode: OCTreatmentsByFlagSet,
Args: []interface{}{"key", "bk", "set", nil}})
assert.Nil(t, err)
assert.Equal(t, "key", r.Key)
assert.Equal(t, lang.Ref("bk"), r.BucketingKey)
assert.Equal(t, "set", r.FlagSet)
assert.Nil(t, r.Attributes)
}

func TestTreatmentsByFlagSetsRPCParsing(t *testing.T) {
var r TreatmentsByFlagSetsArgs
assert.Equal(t,
RPCParseError{Code: PECOpCodeMismatch},
r.PopulateFromRPC(&RPC{RPCBase: protocol.RPCBase{Version: protocol.V1}, OpCode: OCRegister, Args: nil}),
)
assert.Equal(t,
RPCParseError{Code: PECWrongArgCount},
r.PopulateFromRPC(&RPC{RPCBase: protocol.RPCBase{Version: protocol.V1}, OpCode: OCTreatmentsByFlagSets, Args: []interface{}{}}),
)
assert.Equal(t,
RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetsArgKeyIdx)},
r.PopulateFromRPC(&RPC{RPCBase: protocol.RPCBase{Version: protocol.V1}, OpCode: OCTreatmentsByFlagSets, Args: []interface{}{nil, nil, nil, nil}}),
)
assert.Equal(t,
RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetsArgBucketingKeyIdx)},
r.PopulateFromRPC(&RPC{RPCBase: protocol.RPCBase{Version: protocol.V1}, OpCode: OCTreatmentsByFlagSets, Args: []interface{}{"key", 123, nil, nil}}),
)
assert.Equal(t,
RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetsArgFlagSetsIdx)},
r.PopulateFromRPC(&RPC{RPCBase: protocol.RPCBase{Version: protocol.V1}, OpCode: OCTreatmentsByFlagSets, Args: []interface{}{"key", "bk", 123, nil}}),
)
assert.Equal(t,
RPCParseError{Code: PECInvalidArgType, Data: int64(TreatmentsByFlagSetsArgAttributesIdx)},
r.PopulateFromRPC(&RPC{
RPCBase: protocol.RPCBase{Version: protocol.V1},
OpCode: OCTreatmentsByFlagSets,
Args: []interface{}{"key", "bk", []interface{}{"set_1", "set_2"}, 123}}),
)

err := r.PopulateFromRPC(&RPC{
RPCBase: protocol.RPCBase{Version: protocol.V1},
OpCode: OCTreatmentsByFlagSets,
Args: []interface{}{"key", "bk", []interface{}{"set_1", "set_2"}, map[string]interface{}{"a": 1}}})
assert.Nil(t, err)
assert.Equal(t, "key", r.Key)
assert.Equal(t, lang.Ref("bk"), r.BucketingKey)
assert.Equal(t, []string{"set_1", "set_2"}, r.FlagSets)
assert.Equal(t, map[string]interface{}{"a": int64(1)}, r.Attributes)

// nil bucketing key
err = r.PopulateFromRPC(&RPC{
RPCBase: protocol.RPCBase{Version: protocol.V1},
OpCode: OCTreatmentsByFlagSets,
Args: []interface{}{"key", nil, []interface{}{"set_1", "set_2"}, map[string]interface{}{"a": 1}}})
assert.Nil(t, err)
assert.Equal(t, "key", r.Key)
assert.Nil(t, r.BucketingKey)
assert.Equal(t, []string{"set_1", "set_2"}, r.FlagSets)
assert.Equal(t, map[string]interface{}{"a": int64(1)}, r.Attributes)

// nil attributes
err = r.PopulateFromRPC(&RPC{
RPCBase: protocol.RPCBase{Version: protocol.V1},
OpCode: OCTreatmentsByFlagSets,
Args: []interface{}{"key", "bk", []interface{}{"set_1", "set_2"}, nil}})
assert.Nil(t, err)
assert.Equal(t, "key", r.Key)
assert.Equal(t, lang.Ref("bk"), r.BucketingKey)
assert.Equal(t, []string{"set_1", "set_2"}, r.FlagSets)
assert.Nil(t, r.Attributes)
}

func TestTrackRPCParsing(t *testing.T) {
var r TrackArgs
assert.Equal(t,
Expand Down
45 changes: 45 additions & 0 deletions splitio/link/service/v1/clientmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ func (m *ClientManager) dispatchRPC(rpc *protov1.RPC) (interface{}, error) {
return m.handleGetTreatmentsByFlagSet(rpc, false)
case protov1.OCTreatmentsWithConfigByFlagSet:
return m.handleGetTreatmentsByFlagSet(rpc, true)
case protov1.OCTreatmentsByFlagSets:
return m.handleGetTreatmentsByFlagSets(rpc, false)
case protov1.OCTreatmentsWithConfigByFlagSets:
return m.handleGetTreatmentsByFlagSets(rpc, true)
case protov1.OCTrack:
return m.handleTrack(rpc)
case protov1.OCSplitNames:
Expand Down Expand Up @@ -273,6 +277,47 @@ func (m *ClientManager) handleGetTreatmentsByFlagSet(rpc *protov1.RPC, withConfi
return response, nil
}

func (m *ClientManager) handleGetTreatmentsByFlagSets(rpc *protov1.RPC, withConfig bool) (interface{}, error) {

var args protov1.TreatmentsByFlagSetsArgs
if err := args.PopulateFromRPC(rpc); err != nil {
return nil, fmt.Errorf("error parsing treatments arguments: %w", err)
}

res, err := m.splitSDK.TreatmentsByFlagSets(m.clientConfig, args.Key, args.BucketingKey, args.FlagSets, args.Attributes)
if err != nil {
return &protov1.ResponseWrapper[protov1.TreatmentsWithFeaturePayload]{Status: protov1.ResultInternalError}, err
}

results := make(map[string]protov1.TreatmentPayload, len(res))
for feature, evaluationResult := range res {
currentPayload := protov1.TreatmentPayload{
Treatment: evaluationResult.Treatment,
}

if m.clientConfig.ReturnImpressionData && evaluationResult.Impression != nil {
currentPayload.ListenerData = &protov1.ListenerExtraData{
Label: evaluationResult.Impression.Label,
Timestamp: evaluationResult.Impression.Time,
ChangeNumber: evaluationResult.Impression.ChangeNumber,
}
}

if withConfig {
currentPayload.Config = evaluationResult.Config
}

results[feature] = currentPayload
}

response := &protov1.ResponseWrapper[protov1.TreatmentsWithFeaturePayload]{
Status: protov1.ResultOk,
Payload: protov1.TreatmentsWithFeaturePayload{Results: results},
}

return response, nil
}

func (m *ClientManager) handleTrack(rpc *protov1.RPC) (interface{}, error) {

var args protov1.TrackArgs
Expand Down
Loading

0 comments on commit 477cb0c

Please sign in to comment.