diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index f65f0f98..78c42c0d 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -944,17 +944,22 @@ func calJobParallelism(cluster *v1beta1.FlinkCluster) *int32 { return cluster.Spec.Job.Parallelism } - tmReplicas := cluster.Spec.TaskManager.Replicas - ts, ok := cluster.Spec.FlinkProperties["taskmanager.numberOfTaskSlots"] - if !ok { - return nil + var value *int + if ts, ok := cluster.Spec.FlinkProperties["taskmanager.numberOfTaskSlots"]; ok { + parsed, err := strconv.Atoi(ts) + if err != nil { + return nil + } + value = &parsed + } else { + value = calTaskManagerTaskSlots(cluster) } - value, err := strconv.Atoi(ts) - if err != nil { + + if value == nil { return nil } - parallelism := tmReplicas * int32(value) + parallelism := cluster.Spec.TaskManager.Replicas * int32(*value) return ¶llelism } diff --git a/controllers/flinkcluster_converter_test.go b/controllers/flinkcluster_converter_test.go index 8063b278..dfb9a231 100644 --- a/controllers/flinkcluster_converter_test.go +++ b/controllers/flinkcluster_converter_test.go @@ -268,8 +268,7 @@ func TestGetDesiredClusterState(t *testing.T) { }, SecurityContext: &securityContext, }, - FlinkProperties: map[string]string{"taskmanager.numberOfTaskSlots": "1"}, - EnvVars: []corev1.EnvVar{{Name: "FOO", Value: "abc"}}, + EnvVars: []corev1.EnvVar{{Name: "FOO", Value: "abc"}}, EnvFrom: []corev1.EnvFromSource{{ConfigMapRef: &corev1.ConfigMapEnvSource{ LocalObjectReference: corev1.LocalObjectReference{ Name: "FOOMAP",