Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Transition to Queue if the JobCondition is empty #387

Merged
merged 12 commits into from
Sep 1, 2023
24 changes: 5 additions & 19 deletions go/tasks/plugins/k8s/kfoperators/common/common_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,6 @@ type ReplicaEntry struct {
RestartPolicy commonOp.RestartPolicy
}

// ExtractMPICurrentCondition will return the first job condition for MPI
func ExtractMPICurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.JobCondition, error) {
if jobConditions != nil {
sort.Slice(jobConditions, func(i, j int) bool {
return jobConditions[i].LastTransitionTime.Time.After(jobConditions[j].LastTransitionTime.Time)
})

for _, jc := range jobConditions {
if jc.Status == v1.ConditionTrue {
return jc, nil
}
}
}

return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
}

// ExtractCurrentCondition will return the first job condition for tensorflow/pytorch
func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.JobCondition, error) {
if jobConditions != nil {
Expand All @@ -60,14 +43,17 @@ func ExtractCurrentCondition(jobConditions []commonOp.JobCondition) (commonOp.Jo
return jc, nil
}
}
return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
}

return commonOp.JobCondition{}, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions)
return commonOp.JobCondition{}, nil
}

// GetPhaseInfo will return the phase of kubeflow job
func GetPhaseInfo(currentCondition commonOp.JobCondition, occurredAt time.Time,
taskPhaseInfo pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) {
if len(currentCondition.Type) == 0 {
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
}
switch currentCondition.Type {
case commonOp.JobCreated:
return pluginsCore.PhaseInfoQueued(occurredAt, pluginsCore.DefaultPhaseVersion, "JobCreated"), nil
Expand Down
39 changes: 16 additions & 23 deletions go/tasks/plugins/k8s/kfoperators/common/common_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestExtractMPICurrentCondition(t *testing.T) {
func TestExtractCurrentCondition(t *testing.T) {
jobCreated := commonOp.JobCondition{
Type: commonOp.JobCreated,
Status: corev1.ConditionTrue,
Expand All @@ -31,46 +31,39 @@ func TestExtractMPICurrentCondition(t *testing.T) {
jobCreated,
jobRunningActive,
}
currentCondition, err := ExtractMPICurrentCondition(jobConditions)
currentCondition, err := ExtractCurrentCondition(jobConditions)
assert.NoError(t, err)
assert.Equal(t, currentCondition, jobCreated)

jobConditions = nil
currentCondition, err = ExtractMPICurrentCondition(jobConditions)
assert.Error(t, err)
currentCondition, err = ExtractCurrentCondition(jobConditions)
assert.NoError(t, err)
assert.Equal(t, currentCondition, commonOp.JobCondition{})
assert.Equal(t, err, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions))
}

func TestExtractCurrentCondition(t *testing.T) {
jobCreated := commonOp.JobCondition{
Type: commonOp.JobCreated,
Status: corev1.ConditionTrue,
}
jobRunningActive := commonOp.JobCondition{
Type: commonOp.JobRunning,
Status: corev1.ConditionFalse,
}
jobConditions := []commonOp.JobCondition{
jobCreated,
jobRunningActive,
}
currentCondition, err := ExtractCurrentCondition(jobConditions)
currentCondition, err = ExtractCurrentCondition(nil)
assert.NoError(t, err)
assert.Equal(t, currentCondition, jobCreated)
assert.Equal(t, currentCondition, commonOp.JobCondition{})

jobConditions = nil
jobUnknown := commonOp.JobCondition{Type: "unknown"}
jobConditions = []commonOp.JobCondition{jobUnknown}
currentCondition, err = ExtractCurrentCondition(jobConditions)
assert.Error(t, err)
assert.Equal(t, currentCondition, commonOp.JobCondition{})
assert.Equal(t, err, fmt.Errorf("found no current condition. Conditions: %+v", jobConditions))
}

func TestGetPhaseInfo(t *testing.T) {
jobCreating := commonOp.JobCondition{}
taskPhase, err := GetPhaseInfo(jobCreating, time.Now(), pluginsCore.TaskInfo{})
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
assert.NotNil(t, taskPhase.Info())
assert.Nil(t, err)

jobCreated := commonOp.JobCondition{
Type: commonOp.JobCreated,
}
taskPhase, err := GetPhaseInfo(jobCreated, time.Now(), pluginsCore.TaskInfo{})
taskPhase, err = GetPhaseInfo(jobCreated, time.Now(), pluginsCore.TaskInfo{})
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, taskPhase.Phase())
assert.NotNil(t, taskPhase.Info())
Expand Down
32 changes: 32 additions & 0 deletions go/tasks/plugins/k8s/kfoperators/common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package common

import (
"time"

pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
"github.com/flyteorg/flytestdlib/config"
)

//go:generate pflags Config --default-var=defaultConfig

var (
defaultConfig = Config{
Timeout: config.Duration{Duration: 1 * time.Minute},
}

configSection = pluginsConfig.MustRegisterSubSection("kf-operator", &defaultConfig)
)

// Config is config for 'pytorch' plugin
type Config struct {
// If kubeflow operator doesn't update the status of the task after this timeout, the task will be considered failed.
Timeout config.Duration `json:"timeout,omitempty"`
}

func GetConfig() *Config {
return configSection.GetConfig().(*Config)

Check warning on line 27 in go/tasks/plugins/k8s/kfoperators/common/config.go

View check run for this annotation

Codecov / codecov/patch

go/tasks/plugins/k8s/kfoperators/common/config.go#L26-L27

Added lines #L26 - L27 were not covered by tests
}

func SetConfig(cfg *Config) error {
return configSection.SetConfig(cfg)

Check warning on line 31 in go/tasks/plugins/k8s/kfoperators/common/config.go

View check run for this annotation

Codecov / codecov/patch

go/tasks/plugins/k8s/kfoperators/common/config.go#L30-L31

Added lines #L30 - L31 were not covered by tests
}
55 changes: 55 additions & 0 deletions go/tasks/plugins/k8s/kfoperators/common/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

116 changes: 116 additions & 0 deletions go/tasks/plugins/k8s/kfoperators/common/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions go/tasks/plugins/k8s/kfoperators/mpi/mpi.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
currentCondition, err := common.ExtractMPICurrentCondition(app.Status.Conditions)
if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the mpi custom resource since creation time %v", app.CreationTimestamp)
}

Check warning on line 214 in go/tasks/plugins/k8s/kfoperators/mpi/mpi.go

View check run for this annotation

Codecov / codecov/patch

go/tasks/plugins/k8s/kfoperators/mpi/mpi.go#L213-L214

Added lines #L213 - L214 were not covered by tests
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}
Expand All @@ -223,7 +226,6 @@
}

return common.GetMPIPhaseInfo(currentCondition, occurredAt, taskPhaseInfo)

}

func init() {
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/kfoperators/mpi/mpi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func dummyMPIJobResource(mpiResourceHandler mpiOperatorResourceHandler,
Status: mpiOp.JobStatus{
Conditions: jobConditions,
ReplicaStatuses: nil,
StartTime: nil,
StartTime: &v1.Time{Time: time.Now()},
CompletionTime: nil,
LastReconcileTime: nil,
},
Expand Down
3 changes: 3 additions & 0 deletions go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@
return pluginsCore.PhaseInfoUndefined, err
}

if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the pytorch custom resource since creation time %v", app.CreationTimestamp)
}

Check warning on line 236 in go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go

View check run for this annotation

Codecov / codecov/patch

go/tasks/plugins/k8s/kfoperators/pytorch/pytorch.go#L235-L236

Added lines #L235 - L236 were not covered by tests
currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
Expand Down
5 changes: 3 additions & 2 deletions go/tasks/plugins/k8s/kfoperators/pytorch/pytorch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,9 @@ func dummyPytorchJobResource(pytorchResourceHandler pytorchOperatorResourceHandl

return &kubeflowv1.PyTorchJob{
ObjectMeta: v1.ObjectMeta{
Name: jobName,
Namespace: jobNamespace,
CreationTimestamp: v1.Time{Time: time.Now()},
Name: jobName,
Namespace: jobNamespace,
},
Spec: resource.(*kubeflowv1.PyTorchJob).Spec,
Status: commonOp.JobStatus{
Expand Down
4 changes: 4 additions & 0 deletions go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,10 @@
return pluginsCore.PhaseInfoUndefined, err
}

if app.Status.StartTime == nil && app.CreationTimestamp.Add(common.GetConfig().Timeout.Duration).Before(time.Now()) {
return pluginsCore.PhaseInfoUndefined, fmt.Errorf("kubeflow operator hasn't updated the tensorflow custom resource since creation time %v", app.CreationTimestamp)
}

Check warning on line 214 in go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go

View check run for this annotation

Codecov / codecov/patch

go/tasks/plugins/k8s/kfoperators/tensorflow/tensorflow.go#L213-L214

Added lines #L213 - L214 were not covered by tests

currentCondition, err := common.ExtractCurrentCondition(app.Status.Conditions)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func dummyTensorFlowJobResource(tensorflowResourceHandler tensorflowOperatorReso
Status: commonOp.JobStatus{
Conditions: jobConditions,
ReplicaStatuses: nil,
StartTime: nil,
StartTime: &v1.Time{Time: time.Now()},
CompletionTime: nil,
LastReconcileTime: nil,
},
Expand Down
Loading