From a3b6db03713cf60cbe5a1a3306ce1031a10e4fe6 Mon Sep 17 00:00:00 2001 From: Filipe Regadas Date: Thu, 6 May 2021 10:04:42 +0100 Subject: [PATCH] Add better default job parallelism and task manager slots (#29) --- api/v1beta1/flinkcluster_default.go | 4 -- api/v1beta1/flinkcluster_default_test.go | 2 - controllers/flinkcluster_converter.go | 51 ++++++++++++++++++++++-- 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/api/v1beta1/flinkcluster_default.go b/api/v1beta1/flinkcluster_default.go index 680b4d0a..380120ef 100644 --- a/api/v1beta1/flinkcluster_default.go +++ b/api/v1beta1/flinkcluster_default.go @@ -130,10 +130,6 @@ func _SetJobDefault(jobSpec *JobSpec) { jobSpec.AllowNonRestoredState = new(bool) *jobSpec.AllowNonRestoredState = false } - if jobSpec.Parallelism == nil { - jobSpec.Parallelism = new(int32) - *jobSpec.Parallelism = 1 - } if jobSpec.NoLoggingToStdout == nil { jobSpec.NoLoggingToStdout = new(bool) *jobSpec.NoLoggingToStdout = false diff --git a/api/v1beta1/flinkcluster_default_test.go b/api/v1beta1/flinkcluster_default_test.go index 59a2cb3e..cef7a71a 100644 --- a/api/v1beta1/flinkcluster_default_test.go +++ b/api/v1beta1/flinkcluster_default_test.go @@ -49,7 +49,6 @@ func TestSetDefault(t *testing.T) { var defaultTmRPCPort = int32(6122) var defaultTmQueryPort = int32(6125) var defaultJobAllowNonRestoredState = false - var defaultJobParallelism = int32(1) var defaultJobNoLoggingToStdout = false var defaultJobRestartPolicy = JobRestartPolicyNever var defatulJobManagerIngressTLSUse = false @@ -100,7 +99,6 @@ func TestSetDefault(t *testing.T) { }, Job: &JobSpec{ AllowNonRestoredState: &defaultJobAllowNonRestoredState, - Parallelism: &defaultJobParallelism, NoLoggingToStdout: &defaultJobNoLoggingToStdout, RestartPolicy: &defaultJobRestartPolicy, CleanupPolicy: &CleanupPolicy{ diff --git a/controllers/flinkcluster_converter.go b/controllers/flinkcluster_converter.go index 2e496e13..238aa74c 100644 --- a/controllers/flinkcluster_converter.go +++ b/controllers/flinkcluster_converter.go @@ -585,6 +585,10 @@ func getDesiredConfigMap( } } + if taskSlots := calTaskManagerTaskSlots(flinkCluster); taskSlots != nil { + flinkProps["taskmanager.numberOfTaskSlots"] = strconv.Itoa(*taskSlots) + } + // Add custom Flink properties. for k, v := range flinkProperties { // Do not allow to override properties from real deployment. @@ -656,10 +660,11 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job { *jobSpec.AllowNonRestoredState == true { jobArgs = append(jobArgs, "--allowNonRestoredState") } - if jobSpec.Parallelism != nil { - jobArgs = append( - jobArgs, "--parallelism", fmt.Sprint(*jobSpec.Parallelism)) + + if parallelism := calJobParallelism(flinkCluster); parallelism != nil { + jobArgs = append(jobArgs, "--parallelism", fmt.Sprint(*parallelism)) } + if jobSpec.NoLoggingToStdout != nil && *jobSpec.NoLoggingToStdout == true { jobArgs = append(jobArgs, "--sysoutLogging") @@ -917,6 +922,46 @@ func shouldCleanup( return false } +func calJobParallelism(cluster *v1beta1.FlinkCluster) *int32 { + if cluster.Spec.Job.Parallelism != nil { + return cluster.Spec.Job.Parallelism + } + + tmReplicas := cluster.Spec.TaskManager.Replicas + ts, ok := cluster.Spec.FlinkProperties["taskmanager.numberOfTaskSlots"] + if !ok { + return nil + } + value, err := strconv.Atoi(ts) + if err != nil { + return nil + } + + parallelism := tmReplicas * int32(value) + return ¶llelism +} + +func calTaskManagerTaskSlots(cluster *v1beta1.FlinkCluster) *int { + if ts, ok := cluster.Spec.FlinkProperties["taskmanager.numberOfTaskSlots"]; ok { + value, err := strconv.Atoi(ts) + if err != nil { + return nil + } + return &value + } + + quantity := cluster.Spec.TaskManager.Resources.Limits.Cpu() + if quantity == nil { + return nil + } + + s := 1 + if slots := int(quantity.ToDec().AsApproximateFloat64()); slots != 0 { + s = slots / 2 + } + return &s +} + func calFlinkHeapSize(cluster *v1beta1.FlinkCluster) map[string]string { if cluster.Spec.JobManager.MemoryOffHeapRatio == nil { return nil