From 4eb71492f4dbf17951aa5a28a0940a9692c1cd5e Mon Sep 17 00:00:00 2001 From: rickbrouwer Date: Tue, 5 Nov 2024 22:07:06 +0100 Subject: [PATCH] Support multiple queues at the IBMMQ scaler (#6182) Signed-off-by: rickbrouwer Signed-off-by: Rick Brouwer Signed-off-by: Jorge Turrado Ferrero Co-authored-by: Jorge Turrado Ferrero --- CHANGELOG.md | 1 + pkg/scalers/ibmmq_scaler.go | 126 +++++++++++++++++++++---------- pkg/scalers/ibmmq_scaler_test.go | 12 ++- 3 files changed, 98 insertions(+), 41 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b974ff4bd3..eef43573d9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -85,6 +85,7 @@ Here is an overview of all new **experimental** features: - **GitHub Scaler**: Add support to not scale on default runner labels ([#6127](https://github.com/kedacore/keda/issues/6127)) - **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738)) - **Grafana dashboard**: Fix dashboard to handle wildcard scaledObject variables ([#6214](https://github.com/kedacore/keda/issues/6214)) +- **IBMMQ Scaler**: Support multiple queues at the IBMMQ scaler ([#6181](https://github.com/kedacore/keda/issues/6181)) - **Kafka**: Allow disabling FAST negotation when using Kerberos ([#6188](https://github.com/kedacore/keda/issues/6188)) - **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689)) - **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958)) diff --git a/pkg/scalers/ibmmq_scaler.go b/pkg/scalers/ibmmq_scaler.go index 0b0b4c893f4..11df92c4716 100644 --- a/pkg/scalers/ibmmq_scaler.go +++ b/pkg/scalers/ibmmq_scaler.go @@ -26,18 +26,19 @@ type ibmmqScaler struct { } type ibmmqMetadata struct { - Host string `keda:"name=host, order=triggerMetadata"` - QueueName string `keda:"name=queueName, order=triggerMetadata"` - QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"` - ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"` - Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` - Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"` - UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` - TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead - CA string `keda:"name=ca, order=authParams, optional"` - Cert string `keda:"name=cert, order=authParams, optional"` - Key string `keda:"name=key, order=authParams, optional"` - KeyPassword string `keda:"name=keyPassword, order=authParams, optional"` + Host string `keda:"name=host, order=triggerMetadata"` + QueueName []string `keda:"name=queueName;queueNames, order=triggerMetadata"` + QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"` + ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"` + Operation string `keda:"name=operation, order=triggerMetadata, enum=max;avg;sum, default=max"` + Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"` + Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"` + UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"` + TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead + CA string `keda:"name=ca, order=authParams, optional"` + Cert string `keda:"name=cert, order=authParams, optional"` + Key string `keda:"name=key, order=authParams, optional"` + KeyPassword string `keda:"name=keyPassword, order=authParams, optional"` triggerIndex int } @@ -129,54 +130,101 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro } func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) { - queue := s.metadata.QueueName + depths := make([]int64, 0, len(s.metadata.QueueName)) url := s.metadata.Host - var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`) - req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON)) + req, err := http.NewRequestWithContext(ctx, "POST", url, nil) if err != nil { - return 0, fmt.Errorf("failed to request queue depth: %w", err) + return 0, fmt.Errorf("failed to create HTTP request: %w", err) } req.Header.Set("ibm-mq-rest-csrf-token", "value") req.Header.Set("Content-Type", "application/json") - req.SetBasicAuth(s.metadata.Username, s.metadata.Password) - resp, err := s.httpClient.Do(req) - if err != nil { - return 0, fmt.Errorf("failed to contact MQ via REST: %w", err) + for _, queueName := range s.metadata.QueueName { + requestJSON := []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queueName + `", "responseParameters" : ["CURDEPTH"]}`) + req.Body = io.NopCloser(bytes.NewBuffer(requestJSON)) + + resp, err := s.httpClient.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to contact MQ via REST for queue %s: %w", queueName, err) + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusUnauthorized { + return 0, fmt.Errorf("authentication failed: incorrect username or password") + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err) + } + + var response CommandResponse + err = json.Unmarshal(body, &response) + if err != nil { + return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err) + } + + if response.CommandResponse == nil || len(response.CommandResponse) == 0 { + return 0, fmt.Errorf("failed to parse response from REST call for queue %s", queueName) + } + + if response.CommandResponse[0].Parameters == nil { + var reason string + message := strings.Join(response.CommandResponse[0].Message, " ") + if message != "" { + reason = fmt.Sprintf(", reason: %s", message) + } + return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s%s", queueName, reason) + } + + depth := int64(response.CommandResponse[0].Parameters.Curdepth) + depths = append(depths, depth) } - defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return 0, fmt.Errorf("failed to read body of request: %w", err) + switch s.metadata.Operation { + case sumOperation: + return sumDepths(depths), nil + case avgOperation: + return avgDepths(depths), nil + case maxOperation: + return maxDepth(depths), nil + default: + return 0, nil } +} - var response CommandResponse - err = json.Unmarshal(body, &response) - if err != nil { - return 0, fmt.Errorf("failed to parse JSON: %w", err) +func sumDepths(depths []int64) int64 { + var sum int64 + for _, depth := range depths { + sum += depth } + return sum +} - if response.CommandResponse == nil || len(response.CommandResponse) == 0 { - return 0, fmt.Errorf("failed to parse response from REST call") +func avgDepths(depths []int64) int64 { + if len(depths) == 0 { + return 0 } + return sumDepths(depths) / int64(len(depths)) +} - if response.CommandResponse[0].Parameters == nil { - var reason string - message := strings.Join(response.CommandResponse[0].Message, " ") - if message != "" { - reason = fmt.Sprintf(", reason: %s", message) +func maxDepth(depths []int64) int64 { + if len(depths) == 0 { + return 0 + } + max := depths[0] + for _, depth := range depths[1:] { + if depth > max { + max = depth } - return 0, fmt.Errorf("failed to get the current queue depth parameter%s", reason) } - - return int64(response.CommandResponse[0].Parameters.Curdepth), nil + return max } func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec { - metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName)) + metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName[0])) externalMetric := &v2.ExternalMetricSource{ Metric: v2.MetricIdentifier{ Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName), diff --git a/pkg/scalers/ibmmq_scaler_test.go b/pkg/scalers/ibmmq_scaler_test.go index 30a7fc4b132..f5c1d73aa2b 100644 --- a/pkg/scalers/ibmmq_scaler_test.go +++ b/pkg/scalers/ibmmq_scaler_test.go @@ -51,6 +51,12 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{ {map[string]string{}, true, map[string]string{}}, // Properly formed metadata {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Properly formed metadata with 2 queues + {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Properly formed metadata with 2 queues with param queueNames + {map[string]string{"host": testValidMQQueueURL, "queueNames": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}}, + // Invalid operation + {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "operation": "test", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Invalid queueDepth using a string {map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}}, // Invalid activationQueueDepth using a string @@ -89,7 +95,7 @@ func TestIBMMQParseMetadata(t *testing.T) { t.Error("Expected error but got success") fmt.Println(testData) } - if metadata != (ibmmqMetadata{}) && metadata.Password != "" && metadata.Password != testData.authParams["password"] { + if metadata.Password != "" && metadata.Password != testData.authParams["password"] { t.Error("Expected password from configuration but found something else: ", metadata.Password) fmt.Println(testData) } @@ -216,7 +222,9 @@ func TestIBMMQScalerGetQueueDepthViaHTTP(t *testing.T) { scaler := ibmmqScaler{ metadata: ibmmqMetadata{ - Host: server.URL, + Host: server.URL, + QueueName: []string{"TEST.QUEUE"}, + Operation: "max", }, httpClient: server.Client(), }