Skip to content

Commit

Permalink
Job cancel should tear down the cluster without waiting (#443)
Browse files Browse the repository at this point in the history
  • Loading branch information
live-wire authored Aug 4, 2022
1 parent 53df4ad commit ee6d215
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 9 deletions.
3 changes: 3 additions & 0 deletions apis/flinkcluster/v1beta1/flinkcluster_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package v1beta1

import (
"time"

"github.com/hashicorp/go-version"
"github.com/imdario/mergo"
corev1 "k8s.io/api/core/v1"
Expand All @@ -27,6 +29,7 @@ import (
const (
DefaultJobManagerReplicas = 1
DefaultTaskManagerReplicas = 3
ForceTearDownAfter = time.Second * 10
)

var (
Expand Down
33 changes: 25 additions & 8 deletions controllers/flinkcluster/flinkcluster_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,27 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) {
return requeueResult, nil
}

// Job cancel requested. Stop Flink job.
if desiredJob == nil {
if job.IsActive() {
// Job cancel requested or job finished. Stop Flink job and kill job-submitter.
if desiredJob == nil && !(job.IsStopped() && observedSubmitter == nil) {
if shouldForceTearDown(observed.cluster.Status.Control) {
log.Info("Force tearing down cluster")
userControl := getNewControlRequest(observed.cluster)
if userControl == v1beta1.ControlNameJobCancel {
newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress)
}
// cancel all running jobs
if job.IsActive() {
if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil {
return requeueResult, err
}
}
// kill job submitter pod
if observedSubmitter != nil {
if err := reconciler.deleteJob(observedSubmitter); err != nil {
return requeueResult, err
}
}
} else if job.IsActive() {
userControl := getNewControlRequest(observed.cluster)
if userControl == v1beta1.ControlNameJobCancel {
newControlStatus = getControlStatus(userControl, v1beta1.ControlStateInProgress)
Expand All @@ -805,11 +823,7 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) {
if err := reconciler.cancelRunningJobs(true /* takeSavepoint */); err != nil {
return requeueResult, err
}

return requeueResult, err
}

if job.IsStopped() && observedSubmitter != nil {
} else if job.IsStopped() && observedSubmitter != nil {
if observed.cluster.Status.Components.Job.SubmitterExitCode == -1 {
log.Info("Job submitter has not finished yet")
return requeueResult, fmt.Errorf("wait for jobSubmitter to exit")
Expand All @@ -818,6 +832,9 @@ func (reconciler *ClusterReconciler) reconcileJob() (ctrl.Result, error) {
return requeueResult, err
}
}

// to make sure the job is stopped
return requeueResult, nil
}

if job.IsStopped() {
Expand Down
7 changes: 6 additions & 1 deletion controllers/flinkcluster/flinkcluster_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,11 @@ func (updater *ClusterStatusUpdater) deriveJobStatus() *v1beta1.JobStatus {
if oldJob.SubmitterExitCode != exitCode && isNonZeroExitCode(exitCode) {
newJob.FailureReasons = append(newJob.FailureReasons, reason)
}
} else if observedSubmitter.job == nil || observed.flinkJobSubmitter.pod == nil {
// Submitter is nil, so the submitter exit code shouldn't be "running"
if oldJob != nil && oldJob.SubmitterExitCode == -1 {
newJob.SubmitterExitCode = 0
}
}

var newJobState string
Expand Down Expand Up @@ -984,7 +989,7 @@ func deriveControlStatus(
var c *v1beta1.FlinkClusterControlStatus

// New control status
if controlRequest != "" {
if controlStatusChanged(cluster, controlRequest) {
c = getControlStatus(controlRequest, v1beta1.ControlStateRequested)
return c
}
Expand Down
17 changes: 17 additions & 0 deletions controllers/flinkcluster/flinkcluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func canTakeSavepoint(cluster *v1beta1.FlinkCluster) bool {
(savepointStatus == nil || savepointStatus.State != v1beta1.SavepointStateInProgress)
}

// Checks if the job should be stopped because a job-cancel was requested
func shouldStopJob(cluster *v1beta1.FlinkCluster) bool {
var userControl = cluster.Annotations[v1beta1.ControlAnnotation]
var cancelRequested = cluster.Spec.Job.CancelRequested
Expand Down Expand Up @@ -259,6 +260,17 @@ func getControlStatus(controlName string, state string) *v1beta1.FlinkClusterCon
return controlStatus
}

func controlStatusChanged(cluster *v1beta1.FlinkCluster, controlName string) bool {
if controlName == "" {
return false
}
var recorded = cluster.Status
if recorded.Control == nil || recorded.Control.Name != controlName {
return true
}
return false
}

func getControlEvent(status v1beta1.FlinkClusterControlStatus) (eventType string, eventReason string, eventMessage string) {
var msg = status.Message
if len(msg) > 100 {
Expand Down Expand Up @@ -497,3 +509,8 @@ func IsApplicationModeCluster(cluster *v1beta1.FlinkCluster) bool {
jobSpec := cluster.Spec.Job
return jobSpec != nil && *jobSpec.Mode == v1beta1.JobModeApplication
}

// checks if reasonable amount of time has passed since job-cancel was requested
func shouldForceTearDown(controlStatus *v1beta1.FlinkClusterControlStatus) bool {
return controlStatus != nil && controlStatus.Name == v1beta1.ControlNameJobCancel && time.Since(util.GetTime(controlStatus.UpdateTime)) > v1beta1.ForceTearDownAfter
}

0 comments on commit ee6d215

Please sign in to comment.