Skip to content

Commit

Permalink
chore: add large lag threshold test for Kafka scalers (#5361)
Browse files Browse the repository at this point in the history
Signed-off-by: dttung2905 <[email protected]>
  • Loading branch information
dttung2905 authored Jan 12, 2024
1 parent 6c1a67d commit a99c79e
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 121 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ New deprecation(s):
- **General**: Support profiling for KEDA components ([#4789](https://github.com/kedacore/keda/issues/4789))
- **CPU scaler**: Wait for metrics window during CPU scaler tests ([#5294](https://github.com/kedacore/keda/pull/5294))
- **Hashicorp Vault**: Improve test coverage in `pkg/scaling/resolver/hashicorpvault_handler` ([#5195](https://github.com/kedacore/keda/issues/5195))
- **Kafka Scaler**: Add more test cases for large value of LagThreshold ([#5354](https://github.com/kedacore/keda/issues/5354))
- **Openstack Scaler**: Use Gophercloud SDK ([#3439](https://github.com/kedacore/keda/issues/3439))

## v2.12.1
Expand Down
120 changes: 59 additions & 61 deletions pkg/scalers/apache_kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"strconv"
"testing"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -76,6 +77,8 @@ var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// failure, lagThreshold is 0
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success, LagThreshold is 1000000
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "1000000", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success, activationLagThreshold is 0
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "10", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success
Expand Down Expand Up @@ -231,78 +234,65 @@ var parseApacheKafkaAuthParamsTestDataset2 = []parseApacheKafkaAuthParamsTestDat
}

var apacheKafkaMetricIdentifiers = []apacheKafkaMetricIdentifier{
{&parseApacheKafkaMetadataTestDataset[10], 0, "s0-kafka-my-topics"},
{&parseApacheKafkaMetadataTestDataset[10], 1, "s1-kafka-my-topics"},
{&parseApacheKafkaMetadataTestDataset[11], 0, "s0-kafka-my-topics"},
{&parseApacheKafkaMetadataTestDataset[11], 1, "s1-kafka-my-topics"},
{&parseApacheKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"},
}

func TestApacheKafkaGetBrokers(t *testing.T) {
for _, testData := range parseApacheKafkaMetadataTestDataset {
meta, err := parseApacheKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithAuthParams}, logr.Discard())

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if len(meta.bootstrapServers) != testData.numBrokers {
t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers))
}
if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) {
t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.bootstrapServers)
}
if meta.group != testData.group {
t.Errorf("Expected group %s but got %s\n", testData.group, meta.group)
}
if !reflect.DeepEqual(testData.topic, meta.topic) {
t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.topic)
}
if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) {
t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.partitionLimitation)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}
getBrokerApacheKafkaTestBase(t, meta, testData, err)

meta, err = parseApacheKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validApacheKafkaWithoutAuthParams}, logr.Discard())

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if len(meta.bootstrapServers) != testData.numBrokers {
t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers))
}
if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) {
t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.bootstrapServers)
}
if meta.group != testData.group {
t.Errorf("Expected group %s but got %s\n", testData.group, meta.group)
}
if !reflect.DeepEqual(testData.topic, meta.topic) {
t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.topic)
}
if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) {
t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.partitionLimitation)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}
if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers {
t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers)
}
if err == nil && meta.excludePersistentLag != testData.excludePersistentLag {
t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag)
}
if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag {
t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag)
}
getBrokerApacheKafkaTestBase(t, meta, testData, err)
}
}

func getBrokerApacheKafkaTestBase(t *testing.T, meta apacheKafkaMetadata, testData parseApacheKafkaMetadataTestData, err error) {
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if len(meta.bootstrapServers) != testData.numBrokers {
t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers))
}
if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) {
t.Errorf("Expected %#v but got %#v\n", testData.brokers, meta.bootstrapServers)
}
if meta.group != testData.group {
t.Errorf("Expected group %s but got %s\n", testData.group, meta.group)
}
if !reflect.DeepEqual(testData.topic, meta.topic) {
t.Errorf("Expected topics %#v but got %#v\n", testData.topic, meta.topic)
}
if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) {
t.Errorf("Expected %#v but got %#v\n", testData.partitionLimitation, meta.partitionLimitation)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}
if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers {
t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers)
}
if err == nil && meta.excludePersistentLag != testData.excludePersistentLag {
t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag)
}
if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag {
t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag)
}

expectedLagThreshold, er := parseExpectedLagThreshold(testData.metadata)
if er != nil {
t.Errorf("Unable to convert test data lagThreshold %s to string", testData.metadata["lagThreshold"])
}

if meta.lagThreshold != expectedLagThreshold && meta.lagThreshold != defaultKafkaLagThreshold {
t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.lagThreshold, defaultKafkaLagThreshold, expectedLagThreshold)
}
}
func TestApacheKafkaAuthParams(t *testing.T) {
// Testing tls and sasl value in TriggerAuthentication
for _, testData := range parseApacheKafkaAuthParamsTestDataset {
Expand Down Expand Up @@ -382,3 +372,11 @@ func TestApacheKafkaGetMetricSpecForScaling(t *testing.T) {
}
}
}

func parseExpectedLagThreshold(metadata map[string]string) (int64, error) {
val, ok := metadata["lagThreshold"]
if !ok {
return 0, nil
}
return strconv.ParseInt(val, 10, 64)
}
109 changes: 49 additions & 60 deletions pkg/scalers/kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false},
// failure, lagThreshold is 0
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false},
// success, lagThreshold is 1000000
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "1000000", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false},
// failure, activationLagThreshold is not int
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false},
// success, activationLagThreshold is 0
Expand Down Expand Up @@ -309,75 +311,62 @@ var parseKafkaOAuthbrearerAuthParamsTestDataset = []parseKafkaAuthParamsTestData
}

var kafkaMetricIdentifiers = []kafkaMetricIdentifier{
{&parseKafkaMetadataTestDataset[10], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[10], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[11], 0, "s0-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[11], 1, "s1-kafka-my-topic"},
{&parseKafkaMetadataTestDataset[2], 1, "s1-kafka-my-group-topics"},
}

func TestGetBrokers(t *testing.T) {
for _, testData := range parseKafkaMetadataTestDataset {
meta, err := parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithAuthParams}, logr.Discard())

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if len(meta.bootstrapServers) != testData.numBrokers {
t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers))
}
if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) {
t.Errorf("Expected %v but got %v\n", testData.brokers, meta.bootstrapServers)
}
if meta.group != testData.group {
t.Errorf("Expected group %s but got %s\n", testData.group, meta.group)
}
if meta.topic != testData.topic {
t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic)
}
if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) {
t.Errorf("Expected %v but got %v\n", testData.partitionLimitation, meta.partitionLimitation)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}
getBrokerTestBase(t, meta, testData, err)

meta, err = parseKafkaMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validWithoutAuthParams}, logr.Discard())
getBrokerTestBase(t, meta, testData, err)
}
}

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if len(meta.bootstrapServers) != testData.numBrokers {
t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers))
}
if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) {
t.Errorf("Expected %v but got %v\n", testData.brokers, meta.bootstrapServers)
}
if meta.group != testData.group {
t.Errorf("Expected group %s but got %s\n", testData.group, meta.group)
}
if meta.topic != testData.topic {
t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic)
}
if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) {
t.Errorf("Expected %v but got %v\n", testData.partitionLimitation, meta.partitionLimitation)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}
if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers {
t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers)
}
if err == nil && meta.excludePersistentLag != testData.excludePersistentLag {
t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag)
}
if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag {
t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag)
}
func getBrokerTestBase(t *testing.T, meta kafkaMetadata, testData parseKafkaMetadataTestData, err error) {
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
if len(meta.bootstrapServers) != testData.numBrokers {
t.Errorf("Expected %d bootstrap servers but got %d\n", testData.numBrokers, len(meta.bootstrapServers))
}
if !reflect.DeepEqual(testData.brokers, meta.bootstrapServers) {
t.Errorf("Expected %v but got %v\n", testData.brokers, meta.bootstrapServers)
}
if meta.group != testData.group {
t.Errorf("Expected group %s but got %s\n", testData.group, meta.group)
}
if meta.topic != testData.topic {
t.Errorf("Expected topic %s but got %s\n", testData.topic, meta.topic)
}
if !reflect.DeepEqual(testData.partitionLimitation, meta.partitionLimitation) {
t.Errorf("Expected %v but got %v\n", testData.partitionLimitation, meta.partitionLimitation)
}
if err == nil && meta.offsetResetPolicy != testData.offsetResetPolicy {
t.Errorf("Expected offsetResetPolicy %s but got %s\n", testData.offsetResetPolicy, meta.offsetResetPolicy)
}
if err == nil && meta.allowIdleConsumers != testData.allowIdleConsumers {
t.Errorf("Expected allowIdleConsumers %t but got %t\n", testData.allowIdleConsumers, meta.allowIdleConsumers)
}
if err == nil && meta.excludePersistentLag != testData.excludePersistentLag {
t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag)
}
if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag {
t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag)
}
expectedLagThreshold, er := parseExpectedLagThreshold(testData.metadata)
if er != nil {
t.Errorf("Unable to convert test data lagThreshold %s to string", testData.metadata["lagThreshold"])
}

if meta.lagThreshold != expectedLagThreshold && meta.lagThreshold != defaultKafkaLagThreshold {
t.Errorf("Expected lagThreshold to be either %v or %v got %v ", meta.lagThreshold, defaultKafkaLagThreshold, expectedLagThreshold)
}
}

Expand Down

0 comments on commit a99c79e

Please sign in to comment.