Skip to content

Commit

Permalink
Kafka: Limit Scaling to Partitions w/Lag (#5060)
Browse files Browse the repository at this point in the history

Signed-off-by: Dmitrii Senin <[email protected]>
Signed-off-by: Bojan Zelic <[email protected]>
Co-authored-by: Dmitrii Senin <[email protected]>
  • Loading branch information
BojanZelic and DmitrySenin authored Oct 11, 2023
1 parent 853fc61 commit 9a49dff
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 137 deletions.
10 changes: 10 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,16 @@ issues:
- path: datadog_scaler.go
linters:
- gocyclo
# Exclude for kafka_scaler, reason:
# Introduce new parameters to set number of partitions w/lag issue #3997 (PR #5060)
- path: kafka_scaler.go
linters:
- gocyclo
# Exclude for apache_kafka_scaler, reason:
# Introduce new parameters to set number of partitions w/lag issue #3997 (PR #5060)
- path: apache_kafka_scaler.go
linters:
- gocyclo
# Exclude for mongo_scaler and couchdb_scaler, reason:
# pkg/scalers/couchdb_scaler.go:144: 144-174 lines are duplicate of `pkg/scalers/mongo_scaler.go:155-185` (dupl)
- path: couchdb_scaler.go
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ Here is an overview of all new **experimental** features:

- **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962))
- **General**: TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **Kafka Scaler**: Ability to set upper bound to the number of partitions with lag ([#3997](https://github.com/kedacore/keda/issues/3997))
- **Kafka Scaler**: Add support for Kerberos authentication (SASL / GSSAPI) ([#4836](https://github.com/kedacore/keda/issues/4836))

### Fixes
Expand Down
35 changes: 31 additions & 4 deletions pkg/scalers/apache_kafka_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type apacheKafkaMetadata struct {
// If an invalid offset is found, whether to scale to 1 (false - the default) so consumption can
// occur or scale to 0 (true). See discussion in https://github.com/kedacore/keda/issues/2612
scaleToZeroOnInvalidOffset bool
limitToPartitionsWithLag bool

// SASL
saslType kafkaSaslType
Expand Down Expand Up @@ -339,6 +340,22 @@ func parseApacheKafkaMetadata(config *ScalerConfig, logger logr.Logger) (apacheK
meta.scaleToZeroOnInvalidOffset = t
}

meta.limitToPartitionsWithLag = false
if val, ok := config.TriggerMetadata["limitToPartitionsWithLag"]; ok {
t, err := strconv.ParseBool(val)
if err != nil {
return meta, fmt.Errorf("error parsing limitToPartitionsWithLag: %w", err)
}
meta.limitToPartitionsWithLag = t

if meta.allowIdleConsumers && meta.limitToPartitionsWithLag {
return meta, fmt.Errorf("allowIdleConsumers and limitToPartitionsWithLag cannot be set simultaneously")
}
if len(meta.topic) == 0 && meta.limitToPartitionsWithLag {
return meta, fmt.Errorf("topic must be specified when using limitToPartitionsWithLag")
}
}

meta.scalerIndex = config.ScalerIndex
return meta, nil
}
Expand Down Expand Up @@ -639,6 +656,7 @@ func (s *apacheKafkaScaler) getTotalLag(ctx context.Context) (int64, int64, erro
totalLag := int64(0)
totalLagWithPersistent := int64(0)
totalTopicPartitions := int64(0)
partitionsWithLag := int64(0)

for topic, partitionsOffsets := range producerOffsets {
for partition := range partitionsOffsets {
Expand All @@ -648,17 +666,26 @@ func (s *apacheKafkaScaler) getTotalLag(ctx context.Context) (int64, int64, erro
}
totalLag += lag
totalLagWithPersistent += lagWithPersistent

if lag > 0 {
partitionsWithLag++
}
}
totalTopicPartitions += (int64)(len(partitionsOffsets))
}
s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Providing metrics based on totalLag %v, topicPartitions %v, threshold %v", totalLag, topicPartitions, s.metadata.lagThreshold))

s.logger.V(1).Info(fmt.Sprintf("Kafka scaler: Consumer offsets %v, producer offsets %v", consumerOffsets, producerOffsets))

if !s.metadata.allowIdleConsumers {
// don't scale out beyond the number of topicPartitions
if (totalLag / s.metadata.lagThreshold) > totalTopicPartitions {
totalLag = totalTopicPartitions * s.metadata.lagThreshold
if !s.metadata.allowIdleConsumers || s.metadata.limitToPartitionsWithLag {
// don't scale out beyond the number of topicPartitions or partitionsWithLag depending on settings
upperBound := totalTopicPartitions
if s.metadata.limitToPartitionsWithLag {
upperBound = partitionsWithLag
}

if (totalLag / s.metadata.lagThreshold) > upperBound {
totalLag = upperBound * s.metadata.lagThreshold
}
}
return totalLag, totalLagWithPersistent, nil
Expand Down
78 changes: 46 additions & 32 deletions pkg/scalers/apache_kafka_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
)

type parseApacheKafkaMetadataTestData struct {
metadata map[string]string
isError bool
numBrokers int
brokers []string
group string
topic []string
partitionLimitation []int32
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
excludePersistentLag bool
metadata map[string]string
isError bool
numBrokers int
brokers []string
group string
topic []string
partitionLimitation []int32
offsetResetPolicy offsetResetPolicy
allowIdleConsumers bool
excludePersistentLag bool
limitToPartitionsWithLag bool
}

type parseApacheKafkaAuthParamsTestData struct {
Expand Down Expand Up @@ -62,49 +63,59 @@ var validApacheKafkaWithoutAuthParams = map[string]string{}

var parseApacheKafkaMetadataTestDataset = []parseApacheKafkaMetadataTestData{
// failure, no consumer group
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", nil, nil, "latest", false, false},
{map[string]string{"bootstrapServers": "foobar:9092"}, true, 1, []string{"foobar:9092"}, "", nil, nil, "latest", false, false, false},
// success, no topics
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false},
// success, ignore partitionLimitation if no topics
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": "1,2,3,4,5,6"}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false},
// success, no limitation with whitespaced limitation value
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": " "}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false},
// success, no limitation
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, false},
// failure, lagThreshold is negative value
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// failure, lagThreshold is 0
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "0"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success, activationLagThreshold is 0
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "10", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "10", "activationLagThreshold": "0"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success, partitionLimitation as list
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1,2,3,4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false},
// success, partitionLimitation as range
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4}, offsetResetPolicy("latest"), false, false, false},
// success, partitionLimitation mixed list + ranges
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "1-4,8,10-12"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, []int32{1, 2, 3, 4, 8, 10, 11, 12}, offsetResetPolicy("latest"), false, false, false},
// failure, partitionLimitation wrong data type
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "partitionLimitation": "a,b,c,d"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success, more brokers
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success, offsetResetPolicy policy latest
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "latest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// failure, offsetResetPolicy policy wrong
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, "", false, false},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "foo"}, true, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, "", false, false, false},
// success, offsetResetPolicy policy earliest
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("earliest"), false, false},
{map[string]string{"bootstrapServers": "foo:9092,bar:9092", "consumerGroup": "my-group", "topic": "my-topics", "offsetResetPolicy": "earliest"}, false, 2, []string{"foo:9092", "bar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("earliest"), false, false, false},
// failure, allowIdleConsumers malformed
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success, allowIdleConsumers is true
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, false},
// failure, excludePersistentLag is malformed
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "excludePersistentLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "excludePersistentLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// success, excludePersistentLag is true
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "excludePersistentLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, true},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "excludePersistentLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, true, false},
// success, version supported
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, false},
// success, limitToPartitionsWithLag is true
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "limitToPartitionsWithLag": "true"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, true},
// failure, limitToPartitionsWithLag is malformed
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "limitToPartitionsWithLag": "notvalid"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false, false},
// failure, allowIdleConsumers and limitToPartitionsWithLag cannot be set to true simultaneously
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, true},
// success, allowIdleConsumers can be set when limitToPartitionsWithLag is false
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "allowIdleConsumers": "true", "limitToPartitionsWithLag": "false"}, false, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), true, false, false},
// failure, topic must be specified when limitToPartitionsWithLag is true
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "limitToPartitionsWithLag": "true"}, true, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false, true},
}

var parseApacheKafkaAuthParamsTestDataset = []parseApacheKafkaAuthParamsTestData{
Expand Down Expand Up @@ -286,6 +297,9 @@ func TestApacheKafkaGetBrokers(t *testing.T) {
if err == nil && meta.excludePersistentLag != testData.excludePersistentLag {
t.Errorf("Expected excludePersistentLag %t but got %t\n", testData.excludePersistentLag, meta.excludePersistentLag)
}
if err == nil && meta.limitToPartitionsWithLag != testData.limitToPartitionsWithLag {
t.Errorf("Expected limitToPartitionsWithLag %t but got %t\n", testData.limitToPartitionsWithLag, meta.limitToPartitionsWithLag)
}
}
}

Expand Down
Loading

0 comments on commit 9a49dff

Please sign in to comment.