From ee6d215396414039ab67b7070b2cc4ea8ae91fad Mon Sep 17 00:00:00 2001 From: Dhruv Batheja Date: Thu, 4 Aug 2022 11:29:35 +0200 Subject: [PATCH] Job cancel should tear down the cluster without waiting (#443) --- .../v1beta1/flinkcluster_default.go | 3 ++ .../flinkcluster/flinkcluster_reconciler.go | 33 ++++++++++++++----- .../flinkcluster/flinkcluster_updater.go | 7 +++- controllers/flinkcluster/flinkcluster_util.go | 17 ++++++++++ 4 files changed, 51 insertions(+), 9 deletions(-) diff --git a/apis/flinkcluster/v1beta1/flinkcluster_default.go b/apis/flinkcluster/v1beta1/flinkcluster_default.go index 7ac0956e..ca98421d 100644 --- a/apis/flinkcluster/v1beta1/flinkcluster_default.go +++ b/apis/flinkcluster/v1beta1/flinkcluster_default.go @@ -17,6 +17,8 @@ limitations under the License. package v1beta1 import ( + "time" + "github.com/hashicorp/go-version" "github.com/imdario/mergo" corev1 "k8s.io/api/core/v1" @@ -27,6 +29,7 @@ import ( const ( DefaultJobManagerReplicas = 1 DefaultTaskManagerReplicas = 3 + ForceTearDownAfter = time.Second * 10 ) var ( diff --git a/controllers/flinkcluster/flinkcluster_reconciler.go b/controllers/flinkcluster/flinkcluster_reconciler.go index ddd9ce06..3a9fd792 100644 --- a/controllers/flinkcluster/flinkcluster_reconciler.go +++ b/controllers/flinkcluster/flinkcluster_reconciler.go @@ -793,9 +793,27 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { return requeueResult, nil } - // Job cancel requested. Stop Flink job. - if desiredJob == nil { - if job.IsActive() { + // Job cancel requested or job finished. Stop Flink job and kill job-submitter. + if desiredJob == nil && !(job.IsStopped() && observedSubmitter == nil) { + if shouldForceTearDown(observed.cluster.Status.Control) { + log.Info("Force tearing down cluster") + userControl := getNewControlRequest(observed.cluster) + if userControl == v1beta1.ControlNameJobCancel { + newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) + } + // cancel all running jobs + if job.IsActive() { + if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil { + return requeueResult, err + } + } + // kill job submitter pod + if observedSubmitter != nil { + if err := reconciler.deleteJob(observedSubmitter); err != nil { + return requeueResult, err + } + } + } else if job.IsActive() { userControl := getNewControlRequest(observed.cluster) if userControl == v1beta1.ControlNameJobCancel { newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress) @@ -805,11 +823,7 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil { return requeueResult, err } - - return requeueResult, err - } - - if job.IsStopped() && observedSubmitter != nil { + } else if job.IsStopped() && observedSubmitter != nil { if observed.cluster.Status.Components.Job.SubmitterExitCode == -1 { log.Info("Job submitter has not finished yet") return requeueResult, fmt.Errorf("wait for jobSubmitter to exit") @@ -818,6 +832,9 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) { return requeueResult, err } } + + // to make sure the job is stopped + return requeueResult, nil } if job.IsStopped() { diff --git a/controllers/flinkcluster/flinkcluster_updater.go b/controllers/flinkcluster/flinkcluster_updater.go index 3b182c5d..da01ab4a 100644 --- a/controllers/flinkcluster/flinkcluster_updater.go +++ b/controllers/flinkcluster/flinkcluster_updater.go @@ -614,6 +614,11 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus { if oldJob.SubmitterExitCode != exitCode && isNonZeroExitCode(exitCode) { newJob.FailureReasons = append(newJob.FailureReasons, reason) } + } else if observedSubmitter.job == nil || observed.flinkJobSubmitter.pod == nil { + // Submitter is nil, so the submitter exit code shouldn't be "running" + if oldJob != nil && oldJob.SubmitterExitCode == -1 { + newJob.SubmitterExitCode = 0 + } } var newJobState string @@ -984,7 +989,7 @@ func deriveControlStatus( var c *v1beta1.FlinkClusterControlStatus // New control status - if controlRequest != "" { + if controlStatusChanged(cluster, controlRequest) { c = getControlStatus(controlRequest, v1beta1.ControlStateRequested) return c } diff --git a/controllers/flinkcluster/flinkcluster_util.go b/controllers/flinkcluster/flinkcluster_util.go index ee3e6400..87c20ebe 100644 --- a/controllers/flinkcluster/flinkcluster_util.go +++ b/controllers/flinkcluster/flinkcluster_util.go @@ -145,6 +145,7 @@ func canTakeSavepoint(cluster *v1beta1.FlinkCluster) bool { (savepointStatus == nil || savepointStatus.State != v1beta1.SavepointStateInProgress) } +// Checks if the job should be stopped because a job-cancel was requested func shouldStopJob(cluster *v1beta1.FlinkCluster) bool { var userControl = cluster.Annotations[v1beta1.ControlAnnotation] var cancelRequested = cluster.Spec.Job.CancelRequested @@ -259,6 +260,17 @@ func getControlStatus(controlName string, state string) *v1beta1.FlinkClusterCon return controlStatus } +func controlStatusChanged(cluster *v1beta1.FlinkCluster, controlName string) bool { + if controlName == "" { + return false + } + var recorded = cluster.Status + if recorded.Control == nil || recorded.Control.Name != controlName { + return true + } + return false +} + func getControlEvent(status v1beta1.FlinkClusterControlStatus) (eventType string, eventReason string, eventMessage string) { var msg = status.Message if len(msg) > 100 { @@ -497,3 +509,8 @@ func IsApplicationModeCluster(cluster *v1beta1.FlinkCluster) bool { jobSpec := cluster.Spec.Job return jobSpec != nil && *jobSpec.Mode == v1beta1.JobModeApplication } + +// checks if reasonable amount of time has passed since job-cancel was requested +func shouldForceTearDown(controlStatus *v1beta1.FlinkClusterControlStatus) bool { + return controlStatus != nil && controlStatus.Name == v1beta1.ControlNameJobCancel && time.Since(util.GetTime(controlStatus.UpdateTime)) > v1beta1.ForceTearDownAfter +}