Skip to content

Commit

Permalink
Support multiple queues at the IBMMQ scaler (kedacore#6182)
Browse files Browse the repository at this point in the history
Signed-off-by: rickbrouwer <[email protected]>
Signed-off-by: Rick Brouwer <[email protected]>
Signed-off-by: Jorge Turrado Ferrero <[email protected]>
Co-authored-by: Jorge Turrado Ferrero <[email protected]>
  • Loading branch information
rickbrouwer and JorTurFer authored Nov 5, 2024
1 parent 1e90416 commit 4eb7149
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 41 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Here is an overview of all new **experimental** features:
- **GitHub Scaler**: Add support to not scale on default runner labels ([#6127](https://github.com/kedacore/keda/issues/6127))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **Grafana dashboard**: Fix dashboard to handle wildcard scaledObject variables ([#6214](https://github.com/kedacore/keda/issues/6214))
- **IBMMQ Scaler**: Support multiple queues at the IBMMQ scaler ([#6181](https://github.com/kedacore/keda/issues/6181))
- **Kafka**: Allow disabling FAST negotation when using Kerberos ([#6188](https://github.com/kedacore/keda/issues/6188))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))
Expand Down
126 changes: 87 additions & 39 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ type ibmmqScaler struct {
}

type ibmmqMetadata struct {
Host string `keda:"name=host, order=triggerMetadata"`
QueueName string `keda:"name=queueName, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`
Host string `keda:"name=host, order=triggerMetadata"`
QueueName []string `keda:"name=queueName;queueNames, order=triggerMetadata"`
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
Operation string `keda:"name=operation, order=triggerMetadata, enum=max;avg;sum, default=max"`
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
CA string `keda:"name=ca, order=authParams, optional"`
Cert string `keda:"name=cert, order=authParams, optional"`
Key string `keda:"name=key, order=authParams, optional"`
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`

triggerIndex int
}
Expand Down Expand Up @@ -129,54 +130,101 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro
}

func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
queue := s.metadata.QueueName
depths := make([]int64, 0, len(s.metadata.QueueName))
url := s.metadata.Host

var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON))
req, err := http.NewRequestWithContext(ctx, "POST", url, nil)
if err != nil {
return 0, fmt.Errorf("failed to request queue depth: %w", err)
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")

req.SetBasicAuth(s.metadata.Username, s.metadata.Password)

resp, err := s.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to contact MQ via REST: %w", err)
for _, queueName := range s.metadata.QueueName {
requestJSON := []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queueName + `", "responseParameters" : ["CURDEPTH"]}`)
req.Body = io.NopCloser(bytes.NewBuffer(requestJSON))

resp, err := s.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to contact MQ via REST for queue %s: %w", queueName, err)
}
defer resp.Body.Close()

if resp.StatusCode == http.StatusUnauthorized {
return 0, fmt.Errorf("authentication failed: incorrect username or password")
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err)
}

var response CommandResponse
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err)
}

if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
return 0, fmt.Errorf("failed to parse response from REST call for queue %s", queueName)
}

if response.CommandResponse[0].Parameters == nil {
var reason string
message := strings.Join(response.CommandResponse[0].Message, " ")
if message != "" {
reason = fmt.Sprintf(", reason: %s", message)
}
return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s%s", queueName, reason)
}

depth := int64(response.CommandResponse[0].Parameters.Curdepth)
depths = append(depths, depth)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to read body of request: %w", err)
switch s.metadata.Operation {
case sumOperation:
return sumDepths(depths), nil
case avgOperation:
return avgDepths(depths), nil
case maxOperation:
return maxDepth(depths), nil
default:
return 0, nil
}
}

var response CommandResponse
err = json.Unmarshal(body, &response)
if err != nil {
return 0, fmt.Errorf("failed to parse JSON: %w", err)
func sumDepths(depths []int64) int64 {
var sum int64
for _, depth := range depths {
sum += depth
}
return sum
}

if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
return 0, fmt.Errorf("failed to parse response from REST call")
func avgDepths(depths []int64) int64 {
if len(depths) == 0 {
return 0
}
return sumDepths(depths) / int64(len(depths))
}

if response.CommandResponse[0].Parameters == nil {
var reason string
message := strings.Join(response.CommandResponse[0].Message, " ")
if message != "" {
reason = fmt.Sprintf(", reason: %s", message)
func maxDepth(depths []int64) int64 {
if len(depths) == 0 {
return 0
}
max := depths[0]
for _, depth := range depths[1:] {
if depth > max {
max = depth
}
return 0, fmt.Errorf("failed to get the current queue depth parameter%s", reason)
}

return int64(response.CommandResponse[0].Parameters.Curdepth), nil
return max
}

func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName))
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName[0]))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
Expand Down
12 changes: 10 additions & 2 deletions pkg/scalers/ibmmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{
{map[string]string{}, true, map[string]string{}},
// Properly formed metadata
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed metadata with 2 queues
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed metadata with 2 queues with param queueNames
{map[string]string{"host": testValidMQQueueURL, "queueNames": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid operation
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "operation": "test", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid queueDepth using a string
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid activationQueueDepth using a string
Expand Down Expand Up @@ -89,7 +95,7 @@ func TestIBMMQParseMetadata(t *testing.T) {
t.Error("Expected error but got success")
fmt.Println(testData)
}
if metadata != (ibmmqMetadata{}) && metadata.Password != "" && metadata.Password != testData.authParams["password"] {
if metadata.Password != "" && metadata.Password != testData.authParams["password"] {
t.Error("Expected password from configuration but found something else: ", metadata.Password)
fmt.Println(testData)
}
Expand Down Expand Up @@ -216,7 +222,9 @@ func TestIBMMQScalerGetQueueDepthViaHTTP(t *testing.T) {

scaler := ibmmqScaler{
metadata: ibmmqMetadata{
Host: server.URL,
Host: server.URL,
QueueName: []string{"TEST.QUEUE"},
Operation: "max",
},
httpClient: server.Client(),
}
Expand Down

0 comments on commit 4eb7149

Please sign in to comment.