diff --git a/.golangci.yml b/.golangci.yml index f14a447b4dc..98d7f869132 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -97,6 +97,16 @@ issues: - path: datadog_scaler.go linters: - gocyclo + # Exclude for kafka_scaler, reason: + # Introduce new parameters to set number of partitions w/lag issue #3997 (PR #5060) + - path: kafka_scaler.go + linters: + - gocyclo + # Exclude for apache_kafka_scaler, reason: + # Introduce new parameters to set number of partitions w/lag issue #3997 (PR #5060) + - path: apache_kafka_scaler.go + linters: + - gocyclo # Exclude for mongo_scaler and couchdb_scaler, reason: # pkg/scalers/couchdb_scaler.go:144: 144-174 lines are duplicate of `pkg/scalers/mongo_scaler.go:155-185` (dupl) - path: couchdb_scaler.go diff --git a/CHANGELOG.md b/CHANGELOG.md index dc6cf0a6a31..3fce31a17da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ Here is an overview of all new **experimental** features: - **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962)) - **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **Kafka Scaler**: Ability to set upper bound to the number of partitions with lag ([#3997](https://github.com/kedacore/keda/issues/3997)) - **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836)) ### Fixes diff --git a/pkg/scalers/apache_kafka_scaler.go b/pkg/scalers/apache_kafka_scaler.go index af76cf24dcd..b06e71bdcb1 100644 --- a/pkg/scalers/apache_kafka_scaler.go +++ b/pkg/scalers/apache_kafka_scaler.go @@ -60,6 +60,7 @@ type apacheKafkaMetadata struct { // If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can // occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612 scaleToZeroOnInvalidOffset bool + limitToPartitionsWithLag bool // SASL saslType kafkaSaslType @@ -339,6 +340,22 @@ func parseApacheKafkaMetadata(config *ScalerConfig, logger logr.Logger) (apacheK meta.scaleToZeroOnInvalidOffset = t } + meta.limitToPartitionsWithLag = false + if val, ok := config.TriggerMetadata["limitToPartitionsWithLag"]; ok { + t, err := strconv.ParseBool(val) + if err != nil { + return meta, fmt.Errorf("error parsing limitToPartitionsWithLag: %w", err) + } + meta.limitToPartitionsWithLag = t + + if meta.allowIdleConsumers && meta.limitToPartitionsWithLag { + return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously") + } + if len(meta.topic) == 0 && meta.limitToPartitionsWithLag { + return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") + } + } + meta.scalerIndex = config.ScalerIndex return meta, nil } @@ -639,6 +656,7 @@ func (s *apacheKafkaScaler) getTotalLag(ctx context.Context) (int64, int64, erro totalLag := int64(0) totalLagWithPersistent := int64(0) totalTopicPartitions := int64(0) + partitionsWithLag := int64(0) for topic, partitionsOffsets := range producerOffsets { for partition := range partitionsOffsets { @@ -648,6 +666,10 @@ func (s *apacheKafkaScaler) getTotalLag(ctx context.Context) (int64, int64, erro } totalLag += lag totalLagWithPersistent += lagWithPersistent + + if lag > 0 { + partitionsWithLag++ + } } totalTopicPartitions += (int64)(len(partitionsOffsets)) } @@ -655,10 +677,15 @@ func (s *apacheKafkaScaler) getTotalLag(ctx context.Context) (int64, int64, erro s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Consumer offsets %v, producer offsets %v", consumerOffsets, producerOffsets)) - if !s.metadata.allowIdleConsumers { - // don't scale out beyond the number of topicPartitions - if (totalLag / s.metadata.lagThreshold) > totalTopicPartitions { - totalLag = totalTopicPartitions * s.metadata.lagThreshold + if !s.metadata.allowIdleConsumers || s.metadata.limitToPartitionsWithLag { + // don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings + upperBound := totalTopicPartitions + if s.metadata.limitToPartitionsWithLag { + upperBound = partitionsWithLag + } + + if (totalLag / s.metadata.lagThreshold) > upperBound { + totalLag = upperBound * s.metadata.lagThreshold } } return totalLag, totalLagWithPersistent, nil diff --git a/pkg/scalers/apache_kafka_scaler_test.go b/pkg/scalers/apache_kafka_scaler_test.go index 1d7d5539ded..c5ab714dc1b 100644 --- a/pkg/scalers/apache_kafka_scaler_test.go +++ b/pkg/scalers/apache_kafka_scaler_test.go @@ -10,16 +10,17 @@ import ( ) type parseApacheKafkaMetadataTestData struct { - metadata map[string]string - isError bool - numBrokers int - brokers []string - group string - topic []string - partitionLimitation []int32 - offsetResetPolicy offsetResetPolicy - allowIdleConsumers bool - excludePersistentLag bool + metadata map[string]string + isError bool + numBrokers int + brokers []string + group string + topic []string + partitionLimitation []int32 + offsetResetPolicy offsetResetPolicy + allowIdleConsumers bool + excludePersistentLag bool + limitToPartitionsWithLag bool } type parseApacheKafkaAuthParamsTestData struct { @@ -62,49 +63,59 @@ var validApacheKafkaWithoutAuthParams = map[string]string{} var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{ // failure, no consumer group - {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", nil, nil, "latest", false, false}, + {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", nil, nil, "latest", false, false, false}, // success, no topics - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, // success, ignore partitionLimitation if no topics - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, // success, no limitation with whitespaced limitation value - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, // success, no limitation - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false}, // failure, lagThreshold is negative value - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // failure, lagThreshold is 0 - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, activationLagThreshold is 0 - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "10", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "10", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation as list - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation as range - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation mixed list + ranges - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false, false}, // failure, partitionLimitation wrong data type - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, more brokers - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, offsetResetPolicy policy latest - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // failure, offsetResetPolicy policy wrong - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, "", false, false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, "", false, false, false}, // success, offsetResetPolicy policy earliest - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("earliest"), false, false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("earliest"), false, false, false}, // failure, allowIdleConsumers malformed - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, allowIdleConsumers is true - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, false}, // failure, excludePersistentLag is malformed - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "excludePersistentLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "excludePersistentLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, // success, excludePersistentLag is true - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "excludePersistentLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, true}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "excludePersistentLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, true, false}, // success, version supported - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, false}, + // success, limitToPartitionsWithLag is true + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "limitToPartitionsWithLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, true}, + // failure, limitToPartitionsWithLag is malformed + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "limitToPartitionsWithLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false}, + // failure, allowIdleConsumers and limitToPartitionsWithLag cannot be set to true simultaneously + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, true}, + // success, allowIdleConsumers can be set when limitToPartitionsWithLag is false + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true", "limitToPartitionsWithLag": "false"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, false}, + // failure, topic must be specified when limitToPartitionsWithLag is true + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, true}, } var parseApacheKafkaAuthParamsTestDataset = []parseApacheKafkaAuthParamsTestData{ @@ -286,6 +297,9 @@ func TestApacheKafkaGetBrokers(t *testing.T) { if err == nil && meta.excludePersistentLag != testData.excludePersistentLag { t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag) } + if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag { + t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag) + } } } diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 5b1e92522d3..7d198c11eda 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -65,6 +65,7 @@ type kafkaMetadata struct { // If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can // occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612 scaleToZeroOnInvalidOffset bool + limitToPartitionsWithLag bool // SASL saslType kafkaSaslType @@ -468,6 +469,22 @@ func parseKafkaMetadata(config *ScalerConfig, logger logr.Logger) (kafkaMetadata meta.scaleToZeroOnInvalidOffset = t } + meta.limitToPartitionsWithLag = false + if val, ok := config.TriggerMetadata["limitToPartitionsWithLag"]; ok { + t, err := strconv.ParseBool(val) + if err != nil { + return meta, fmt.Errorf("error parsing limitToPartitionsWithLag: %w", err) + } + meta.limitToPartitionsWithLag = t + + if meta.allowIdleConsumers && meta.limitToPartitionsWithLag { + return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously") + } + if len(meta.topic) == 0 && meta.limitToPartitionsWithLag { + return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag") + } + } + meta.version = sarama.V1_0_0_0 if val, ok := config.TriggerMetadata["version"]; ok { val = strings.TrimSpace(val) @@ -797,6 +814,7 @@ func (s *kafkaScaler) getTotalLag() (int64, int64, error) { totalLag := int64(0) totalLagWithPersistent := int64(0) totalTopicPartitions := int64(0) + partitionsWithLag := int64(0) for topic, partitionsOffsets := range producerOffsets { for partition := range partitionsOffsets { @@ -806,15 +824,24 @@ func (s *kafkaScaler) getTotalLag() (int64, int64, error) { } totalLag += lag totalLagWithPersistent += lagWithPersistent + + if lag > 0 { + partitionsWithLag++ + } } totalTopicPartitions += (int64)(len(partitionsOffsets)) } s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, len(topicPartitions), s.metadata.lagThreshold)) - if !s.metadata.allowIdleConsumers { - // don't scale out beyond the number of topicPartitions - if (totalLag / s.metadata.lagThreshold) > totalTopicPartitions { - totalLag = totalTopicPartitions * s.metadata.lagThreshold + if !s.metadata.allowIdleConsumers || s.metadata.limitToPartitionsWithLag { + // don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings + upperBound := totalTopicPartitions + if s.metadata.limitToPartitionsWithLag { + upperBound = partitionsWithLag + } + + if (totalLag / s.metadata.lagThreshold) > upperBound { + totalLag = upperBound * s.metadata.lagThreshold } } return totalLag, totalLagWithPersistent, nil diff --git a/pkg/scalers/kafka_scaler_test.go b/pkg/scalers/kafka_scaler_test.go index efab0e7c2b8..5ff3274103d 100644 --- a/pkg/scalers/kafka_scaler_test.go +++ b/pkg/scalers/kafka_scaler_test.go @@ -14,16 +14,17 @@ import ( ) type parseKafkaMetadataTestData struct { - metadata map[string]string - isError bool - numBrokers int - brokers []string - group string - topic string - partitionLimitation []int32 - offsetResetPolicy offsetResetPolicy - allowIdleConsumers bool - excludePersistentLag bool + metadata map[string]string + isError bool + numBrokers int + brokers []string + group string + topic string + partitionLimitation []int32 + offsetResetPolicy offsetResetPolicy + allowIdleConsumers bool + excludePersistentLag bool + limitToPartitionsWithLag bool } type parseKafkaAuthParamsTestData struct { @@ -66,55 +67,65 @@ var validWithoutAuthParams = map[string]string{} var parseKafkaMetadataTestDataset = []parseKafkaMetadataTestData{ // failure, no bootstrapServers - {map[string]string{}, true, 0, nil, "", "", nil, "", false, false}, + {map[string]string{}, true, 0, nil, "", "", nil, "", false, false, false}, // failure, no consumer group - {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", nil, "latest", false, false}, + {map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", "", nil, "latest", false, false, false}, // success, no topic - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false, false}, // success, ignore partitionLimitation if no topic - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false, false}, // success, no limitation with whitespaced limitation value - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false, false}, // success, no limitation - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false, false}, // failure, version not supported - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // failure, lagThreshold is negative value - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // failure, lagThreshold is 0 - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // failure, activationLagThreshold is not int - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "AA"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // success, activationLagThreshold is 0 - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "lagThreshold": "10", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // success - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation as list - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation as range - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false}, // success, partitionLimitation mixed list + ranges - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false, false}, // failure, partitionLimitation wrong data type - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // success, more brokers - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // success, offsetResetPolicy policy latest - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // failure, offsetResetPolicy policy wrong - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, "", false, false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, "", false, false, false}, // success, offsetResetPolicy policy earliest - {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("earliest"), false, false}, + {map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topic", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("earliest"), false, false, false}, // failure, allowIdleConsumers malformed - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // success, allowIdleConsumers is true - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true, false, false}, // failure, excludePersistentLag is malformed - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "excludePersistentLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "excludePersistentLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, // success, excludePersistentLag is true - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "excludePersistentLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, true}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "excludePersistentLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, true, false}, // success, version supported - {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "version": "1.0.0"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true, false, false}, + // success, limitToPartitionsWithLag is true + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "limitToPartitionsWithLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, true}, + // failure, limitToPartitionsWithLag is malformed + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "limitToPartitionsWithLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), false, false, false}, + // failure, allowIdleConsumers and limitToPartitionsWithLag cannot be set to true simultaneously + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true, false, true}, + // success, allowIdleConsumers can be set when limitToPartitionsWithLag is false + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topic", "allowIdleConsumers": "true", "limitToPartitionsWithLag": "false"}, false, 1, []string{"foobar:9092"}, "my-group", "my-topic", nil, offsetResetPolicy("latest"), true, false, false}, + // failure, topic must be specified when limitToPartitionsWithLag is true + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", "", nil, offsetResetPolicy("latest"), false, false, true}, } var parseKafkaAuthParamsTestDataset = []parseKafkaAuthParamsTestData{ @@ -364,6 +375,9 @@ func TestGetBrokers(t *testing.T) { if err == nil && meta.excludePersistentLag != testData.excludePersistentLag { t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag) } + if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag { + t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag) + } } } diff --git a/tests/scalers/apache_kafka/apache_kafka_test.go b/tests/scalers/apache_kafka/apache_kafka_test.go index c5cb6524f7c..f31670f6427 100644 --- a/tests/scalers/apache_kafka/apache_kafka_test.go +++ b/tests/scalers/apache_kafka/apache_kafka_test.go @@ -23,40 +23,43 @@ const ( ) var ( - testNamespace = fmt.Sprintf("%s-ns", testName) - deploymentName = fmt.Sprintf("%s-deployment", testName) - kafkaName = fmt.Sprintf("%s-kafka", testName) - kafkaClientName = fmt.Sprintf("%s-client", testName) - scaledObjectName = fmt.Sprintf("%s-so", testName) - bootstrapServer = fmt.Sprintf("%s-kafka-bootstrap.%s:9092", kafkaName, testNamespace) - topic1 = "kafka-topic" - topic2 = "kafka-topic2" - zeroInvalidOffsetTopic = "kafka-topic-zero-invalid-offset" - oneInvalidOffsetTopic = "kafka-topic-one-invalid-offset" - invalidOffsetGroup = "invalidOffset" - persistentLagTopic = "kafka-topic-persistent-lag" - persistentLagGroup = "persistentLag" - persistentLagDeploymentGroup = "persistentLagDeploymentGroup" - topicPartitions = 3 + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + kafkaName = fmt.Sprintf("%s-kafka", testName) + kafkaClientName = fmt.Sprintf("%s-client", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + bootstrapServer = fmt.Sprintf("%s-kafka-bootstrap.%s:9092", kafkaName, testNamespace) + topic1 = "kafka-topic" + topic2 = "kafka-topic2" + zeroInvalidOffsetTopic = "kafka-topic-zero-invalid-offset" + oneInvalidOffsetTopic = "kafka-topic-one-invalid-offset" + invalidOffsetGroup = "invalidOffset" + persistentLagTopic = "kafka-topic-persistent-lag" + persistentLagGroup = "persistentLag" + persistentLagDeploymentGroup = "persistentLagDeploymentGroup" + limitToPartitionsWithLagTopic = "limit-to-partitions-with-lag" + limitToPartitionsWithLagGroup = "limitToPartitionsWithLag" + topicPartitions = 3 ) type templateData struct { - TestNamespace string - DeploymentName string - ScaledObjectName string - KafkaName string - KafkaTopicName string - KafkaTopicPartitions int - KafkaClientName string - TopicName string - Topic1Name string - Topic2Name string - BootstrapServer string - ResetPolicy string - Params string - Commit string - ScaleToZeroOnInvalid string - ExcludePersistentLag string + TestNamespace string + DeploymentName string + ScaledObjectName string + KafkaName string + KafkaTopicName string + KafkaTopicPartitions int + KafkaClientName string + TopicName string + Topic1Name string + Topic2Name string + BootstrapServer string + ResetPolicy string + Params string + Commit string + ScaleToZeroOnInvalid string + ExcludePersistentLag string + LimitToPartitionsWithLag string } const ( @@ -277,6 +280,45 @@ spec: excludePersistentLag: '{{.ExcludePersistentLag}}' offsetResetPolicy: 'latest'` + limitToPartionsWithLagScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + pollingInterval: 5 + cooldownPeriod: 0 + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleUp: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + scaleDown: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + triggers: + - type: apache-kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + offsetResetPolicy: 'earliest' + lagThreshold: '1' + activationLagThreshold: '1' + limitToPartitionsWithLag: '{{.LimitToPartitionsWithLag}}'` + kafkaClusterTemplate = `apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: @@ -356,6 +398,7 @@ func TestScaler(t *testing.T) { addTopic(t, data, zeroInvalidOffsetTopic, 1) addTopic(t, data, oneInvalidOffsetTopic, 1) addTopic(t, data, persistentLagTopic, topicPartitions) + addTopic(t, data, limitToPartitionsWithLagTopic, topicPartitions) // test scaling testEarliestPolicy(t, kc, data) @@ -364,6 +407,7 @@ func TestScaler(t *testing.T) { testZeroOnInvalidOffset(t, kc, data) testOneOnInvalidOffset(t, kc, data) testPersistentLag(t, kc, data) + testScalingOnlyPartitionsWithLag(t, kc, data) } func testEarliestPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { @@ -507,6 +551,13 @@ func publishMessage(t *testing.T, topic string) { assert.NoErrorf(t, err, "cannot execute command - %s", err) } +// publish a message to a specific partition; We can't specify the exact partition, +// but any messages with the same key will end up in the same partition +func publishMessagePartitionKey(t *testing.T, topic string, key string) { + _, _, err := ExecCommandOnSpecificPod(t, kafkaClientName, testNamespace, fmt.Sprintf(`echo -e "%s\t {"text": "foo"}" | kafka-console-producer --property parse.key=true --broker-list %s --topic %s`, key, bootstrapServer, topic)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + func commitPartition(t *testing.T, topic string, group string) { _, _, err := ExecCommandOnSpecificPod(t, kafkaClientName, testNamespace, fmt.Sprintf(`kafka-console-consumer --bootstrap-server %s --topic %s --group %s --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000`, bootstrapServer, topic, group)) assert.NoErrorf(t, err, "cannot execute command - %s", err) @@ -555,6 +606,61 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData KubectlDeleteWithTemplate(t, data, "persistentLagScaledObjectTemplate", persistentLagScaledObjectTemplate) } +func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing limitToPartitionsWithLag: no scale out ---") + + // Simulate Consumption from topic by consumer group + // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + commitPartition(t, limitToPartitionsWithLagTopic, "latest") + + data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup) + data.Commit = StringFalse + data.TopicName = limitToPartitionsWithLagTopic + data.LimitToPartitionsWithLag = StringTrue + data.ResetPolicy = "latest" + + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "limitToPartionsWithLagScaledObjectTemplate", limitToPartionsWithLagScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "limitToPartionsWithLagScaledObjectTemplate", limitToPartionsWithLagScaledObjectTemplate) + + // Shouldn't scale pods applying latest policy + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) + + // Scale application with kafka messages in persistentLagTopic + firstPartitionKey := "my-first-key" + + // Shouldn't scale pods with only 1 message due to activation value + publishMessagePartitionKey(t, limitToPartitionsWithLagTopic, firstPartitionKey) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) + + // Publish 5 messages to the same partition + messages := 5 + for i := 0; i < messages; i++ { + publishMessagePartitionKey(t, limitToPartitionsWithLagTopic, firstPartitionKey) + } + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + // Partition lag should not scale pod above 1 replicas after 2 reconciliation cycles + // because we only have lag on 1 partition + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 1, 60) + + // publish new messages on a separate partition + secondPartitionKey := "my-second-key" + for i := 0; i < messages; i++ { + publishMessagePartitionKey(t, limitToPartitionsWithLagTopic, secondPartitionKey) + } + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 60, 2), + "replica count should be %d after 2 minute", 2) + + // Partition lag should not scale pod above 2 replicas after 2 reconciliation cycles + // because we only have lag on 2 partitions + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 2, 60) +} + func addTopic(t *testing.T, data templateData, name string, partitions int) { t.Log("--- adding kafka topic" + name + " and partitions " + strconv.Itoa(partitions) + " ---") data.KafkaTopicName = name diff --git a/tests/scalers/kafka/kafka_test.go b/tests/scalers/kafka/kafka_test.go index b0f929c5d8a..e28f22d278e 100644 --- a/tests/scalers/kafka/kafka_test.go +++ b/tests/scalers/kafka/kafka_test.go @@ -23,40 +23,43 @@ const ( ) var ( - testNamespace = fmt.Sprintf("%s-ns", testName) - deploymentName = fmt.Sprintf("%s-deployment", testName) - kafkaName = fmt.Sprintf("%s-kafka", testName) - kafkaClientName = fmt.Sprintf("%s-client", testName) - scaledObjectName = fmt.Sprintf("%s-so", testName) - bootstrapServer = fmt.Sprintf("%s-kafka-bootstrap.%s:9092", kafkaName, testNamespace) - topic1 = "kafka-topic" - topic2 = "kafka-topic2" - zeroInvalidOffsetTopic = "kafka-topic-zero-invalid-offset" - oneInvalidOffsetTopic = "kafka-topic-one-invalid-offset" - invalidOffsetGroup = "invalidOffset" - persistentLagTopic = "kafka-topic-persistent-lag" - persistentLagGroup = "persistentLag" - persistentLagDeploymentGroup = "persistentLagDeploymentGroup" - topicPartitions = 3 + testNamespace = fmt.Sprintf("%s-ns", testName) + deploymentName = fmt.Sprintf("%s-deployment", testName) + kafkaName = fmt.Sprintf("%s-kafka", testName) + kafkaClientName = fmt.Sprintf("%s-client", testName) + scaledObjectName = fmt.Sprintf("%s-so", testName) + bootstrapServer = fmt.Sprintf("%s-kafka-bootstrap.%s:9092", kafkaName, testNamespace) + topic1 = "kafka-topic" + topic2 = "kafka-topic2" + zeroInvalidOffsetTopic = "kafka-topic-zero-invalid-offset" + oneInvalidOffsetTopic = "kafka-topic-one-invalid-offset" + invalidOffsetGroup = "invalidOffset" + persistentLagTopic = "kafka-topic-persistent-lag" + persistentLagGroup = "persistentLag" + persistentLagDeploymentGroup = "persistentLagDeploymentGroup" + limitToPartitionsWithLagTopic = "limit-to-partitions-with-lag" + limitToPartitionsWithLagGroup = "limitToPartitionsWithLag" + topicPartitions = 3 ) type templateData struct { - TestNamespace string - DeploymentName string - ScaledObjectName string - KafkaName string - KafkaTopicName string - KafkaTopicPartitions int - KafkaClientName string - TopicName string - Topic1Name string - Topic2Name string - BootstrapServer string - ResetPolicy string - Params string - Commit string - ScaleToZeroOnInvalid string - ExcludePersistentLag string + TestNamespace string + DeploymentName string + ScaledObjectName string + KafkaName string + KafkaTopicName string + KafkaTopicPartitions int + KafkaClientName string + TopicName string + Topic1Name string + Topic2Name string + BootstrapServer string + ResetPolicy string + Params string + Commit string + ScaleToZeroOnInvalid string + ExcludePersistentLag string + LimitToPartitionsWithLag string } const ( @@ -276,6 +279,45 @@ spec: excludePersistentLag: '{{.ExcludePersistentLag}}' offsetResetPolicy: 'latest'` + limitToPartionsWithLagScaledObjectTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: {{.ScaledObjectName}} + namespace: {{.TestNamespace}} + labels: + app: {{.DeploymentName}} +spec: + pollingInterval: 5 + cooldownPeriod: 0 + scaleTargetRef: + name: {{.DeploymentName}} + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleUp: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + scaleDown: + stabilizationWindowSeconds: 0 + policies: + - type: Percent + value: 100 + periodSeconds: 15 + triggers: + - type: kafka + metadata: + topic: {{.TopicName}} + bootstrapServers: {{.BootstrapServer}} + consumerGroup: {{.ResetPolicy}} + offsetResetPolicy: 'earliest' + lagThreshold: '1' + activationLagThreshold: '1' + limitToPartitionsWithLag: '{{.LimitToPartitionsWithLag}}'` + kafkaClusterTemplate = `apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: @@ -355,6 +397,7 @@ func TestScaler(t *testing.T) { addTopic(t, data, zeroInvalidOffsetTopic, 1) addTopic(t, data, oneInvalidOffsetTopic, 1) addTopic(t, data, persistentLagTopic, topicPartitions) + addTopic(t, data, limitToPartitionsWithLagTopic, topicPartitions) // test scaling testEarliestPolicy(t, kc, data) @@ -363,6 +406,7 @@ func TestScaler(t *testing.T) { testZeroOnInvalidOffset(t, kc, data) testOneOnInvalidOffset(t, kc, data) testPersistentLag(t, kc, data) + testScalingOnlyPartitionsWithLag(t, kc, data) } func testEarliestPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) { @@ -505,6 +549,13 @@ func publishMessage(t *testing.T, topic string) { assert.NoErrorf(t, err, "cannot execute command - %s", err) } +// publish a message to a specific partition; We can't specify the exact partition, +// but any messages with the same key will end up in the same partition +func publishMessagePartitionKey(t *testing.T, topic string, key string) { + _, _, err := ExecCommandOnSpecificPod(t, kafkaClientName, testNamespace, fmt.Sprintf(`echo -e "%s\t {"text": "foo"}" | kafka-console-producer --property parse.key=true --broker-list %s --topic %s`, key, bootstrapServer, topic)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) +} + func commitPartition(t *testing.T, topic string, group string) { _, _, err := ExecCommandOnSpecificPod(t, kafkaClientName, testNamespace, fmt.Sprintf(`kafka-console-consumer --bootstrap-server %s --topic %s --group %s --from-beginning --consumer-property enable.auto.commit=true --timeout-ms 15000`, bootstrapServer, topic, group)) assert.NoErrorf(t, err, "cannot execute command - %s", err) @@ -553,6 +604,61 @@ func testPersistentLag(t *testing.T, kc *kubernetes.Clientset, data templateData KubectlDeleteWithTemplate(t, data, "persistentLagScaledObjectTemplate", persistentLagScaledObjectTemplate) } +func testScalingOnlyPartitionsWithLag(t *testing.T, kc *kubernetes.Clientset, data templateData) { + t.Log("--- testing limitToPartitionsWithLag: no scale out ---") + + // Simulate Consumption from topic by consumer group + // To avoid edge case where where scaling could be effectively disabled (Consumer never makes a commit) + commitPartition(t, limitToPartitionsWithLagTopic, "latest") + + data.Params = fmt.Sprintf("--topic %s --group %s", limitToPartitionsWithLagTopic, limitToPartitionsWithLagGroup) + data.Commit = StringFalse + data.TopicName = limitToPartitionsWithLagTopic + data.LimitToPartitionsWithLag = StringTrue + data.ResetPolicy = "latest" + + KubectlApplyWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + defer KubectlDeleteWithTemplate(t, data, "singleDeploymentTemplate", singleDeploymentTemplate) + KubectlApplyWithTemplate(t, data, "limitToPartionsWithLagScaledObjectTemplate", limitToPartionsWithLagScaledObjectTemplate) + defer KubectlDeleteWithTemplate(t, data, "limitToPartionsWithLagScaledObjectTemplate", limitToPartionsWithLagScaledObjectTemplate) + + // Shouldn't scale pods applying latest policy + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) + + // Scale application with kafka messages in persistentLagTopic + firstPartitionKey := "my-first-key" + + // Shouldn't scale pods with only 1 message due to activation value + publishMessagePartitionKey(t, limitToPartitionsWithLagTopic, firstPartitionKey) + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 0, 30) + + // Publish 5 messages to the same partition + messages := 5 + for i := 0; i < messages; i++ { + publishMessagePartitionKey(t, limitToPartitionsWithLagTopic, firstPartitionKey) + } + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 1, 60, 2), + "replica count should be %d after 2 minute", 1) + + // Partition lag should not scale pod above 1 replicas after 2 reconciliation cycles + // because we only have lag on 1 partition + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 1, 60) + + // publish new messages on a separate partition + secondPartitionKey := "my-second-key" + for i := 0; i < messages; i++ { + publishMessagePartitionKey(t, limitToPartitionsWithLagTopic, secondPartitionKey) + } + + assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, 2, 60, 2), + "replica count should be %d after 2 minute", 2) + + // Partition lag should not scale pod above 2 replicas after 2 reconciliation cycles + // because we only have lag on 2 partitions + AssertReplicaCountNotChangeDuringTimePeriod(t, kc, deploymentName, testNamespace, 2, 60) +} + func addTopic(t *testing.T, data templateData, name string, partitions int) { t.Log("--- adding kafka topic" + name + " and partitions " + strconv.Itoa(partitions) + " ---") data.KafkaTopicName = name