Skip to content

Commit

Permalink
Add better default job parallelism and task manager slots (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored May 6, 2021
1 parent b567bc8 commit a3b6db0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 9 deletions.
4 changes: 0 additions & 4 deletions api/v1beta1/flinkcluster_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ func _SetJobDefault(jobSpec *JobSpec) {
jobSpec.AllowNonRestoredState = new(bool)
*jobSpec.AllowNonRestoredState = false
}
if jobSpec.Parallelism == nil {
jobSpec.Parallelism = new(int32)
*jobSpec.Parallelism = 1
}
if jobSpec.NoLoggingToStdout == nil {
jobSpec.NoLoggingToStdout = new(bool)
*jobSpec.NoLoggingToStdout = false
Expand Down
2 changes: 0 additions & 2 deletions api/v1beta1/flinkcluster_default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func TestSetDefault(t *testing.T) {
var defaultTmRPCPort = int32(6122)
var defaultTmQueryPort = int32(6125)
var defaultJobAllowNonRestoredState = false
var defaultJobParallelism = int32(1)
var defaultJobNoLoggingToStdout = false
var defaultJobRestartPolicy = JobRestartPolicyNever
var defatulJobManagerIngressTLSUse = false
Expand Down Expand Up @@ -100,7 +99,6 @@ func TestSetDefault(t *testing.T) {
},
Job: &JobSpec{
AllowNonRestoredState: &defaultJobAllowNonRestoredState,
Parallelism: &defaultJobParallelism,
NoLoggingToStdout: &defaultJobNoLoggingToStdout,
RestartPolicy: &defaultJobRestartPolicy,
CleanupPolicy: &CleanupPolicy{
Expand Down
51 changes: 48 additions & 3 deletions controllers/flinkcluster_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,10 @@ func getDesiredConfigMap(
}
}

if taskSlots := calTaskManagerTaskSlots(flinkCluster); taskSlots != nil {
flinkProps["taskmanager.numberOfTaskSlots"] = strconv.Itoa(*taskSlots)
}

// Add custom Flink properties.
for k, v := range flinkProperties {
// Do not allow to override properties from real deployment.
Expand Down Expand Up @@ -656,10 +660,11 @@ func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
*jobSpec.AllowNonRestoredState == true {
jobArgs = append(jobArgs, "--allowNonRestoredState")
}
if jobSpec.Parallelism != nil {
jobArgs = append(
jobArgs, "--parallelism", fmt.Sprint(*jobSpec.Parallelism))

if parallelism := calJobParallelism(flinkCluster); parallelism != nil {
jobArgs = append(jobArgs, "--parallelism", fmt.Sprint(*parallelism))
}

if jobSpec.NoLoggingToStdout != nil &&
*jobSpec.NoLoggingToStdout == true {
jobArgs = append(jobArgs, "--sysoutLogging")
Expand Down Expand Up @@ -917,6 +922,46 @@ func shouldCleanup(
return false
}

func calJobParallelism(cluster *v1beta1.FlinkCluster) *int32 {
if cluster.Spec.Job.Parallelism != nil {
return cluster.Spec.Job.Parallelism
}

tmReplicas := cluster.Spec.TaskManager.Replicas
ts, ok := cluster.Spec.FlinkProperties["taskmanager.numberOfTaskSlots"]
if !ok {
return nil
}
value, err := strconv.Atoi(ts)
if err != nil {
return nil
}

parallelism := tmReplicas * int32(value)
return &parallelism
}

func calTaskManagerTaskSlots(cluster *v1beta1.FlinkCluster) *int {
if ts, ok := cluster.Spec.FlinkProperties["taskmanager.numberOfTaskSlots"]; ok {
value, err := strconv.Atoi(ts)
if err != nil {
return nil
}
return &value
}

quantity := cluster.Spec.TaskManager.Resources.Limits.Cpu()
if quantity == nil {
return nil
}

s := 1
if slots := int(quantity.ToDec().AsApproximateFloat64()); slots != 0 {
s = slots / 2
}
return &s
}

func calFlinkHeapSize(cluster *v1beta1.FlinkCluster) map[string]string {
if cluster.Spec.JobManager.MemoryOffHeapRatio == nil {
return nil
Expand Down

0 comments on commit a3b6db0

Please sign in to comment.