diff --git a/apis/flinkcluster/v1beta1/flinkcluster_types_util.go b/apis/flinkcluster/v1beta1/flinkcluster_types_util.go index 5e5cb148..312118b4 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_types_util.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_types_util.go @@ -64,20 +64,15 @@ func (j *JobStatus) IsSavepointUpToDate(spec *JobSpec, compareTime time.Time) bo } // ShouldRestart returns true if the controller should restart failed job. -// The controller can restart the job only if there is a savepoint that is close to the end time of the job. +// The controller can restart the job if policy is set to FromSavepointOnFailure. +// Job will restart from savepoint if the savepoint was taken successfully. func (j *JobStatus) ShouldRestart(spec *JobSpec) bool { if j == nil || !j.IsFailed() || spec == nil { return false } restartEnabled := spec.RestartPolicy != nil && *spec.RestartPolicy == JobRestartPolicyFromSavepointOnFailure - - var jobCompletionTime time.Time - if j.CompletionTime != nil { - jobCompletionTime = j.CompletionTime.Time - } - - return restartEnabled && j.IsSavepointUpToDate(spec, jobCompletionTime) + return restartEnabled } // UpdateReady returns true if job is ready to proceed update. diff --git a/apis/flinkcluster/v1beta1/flinkcluster_types_util_test.go b/apis/flinkcluster/v1beta1/flinkcluster_types_util_test.go index fceba3db..40a04bf1 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_types_util_test.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_types_util_test.go @@ -124,7 +124,7 @@ func TestShouldRestartJob(t *testing.T) { CompletionTime: &metav1.Time{Time: jobCompletionTime}, } restart = jobStatus.ShouldRestart(&jobSpec) - assert.Equal(t, restart, false) + assert.Equal(t, restart, true) // Not restart with restartPolicy Never jobSpec = JobSpec{