Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw committed Nov 17, 2023
1 parent 1a3d6f2 commit 040745e
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executors

import (
"context"

"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package executors

import (
"context"
"testing"

"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks"
"github.com/stretchr/testify/assert"
)

type nl struct {
NodeLookup
}

type en struct {
v1alpha1.ExecutableNode
}

type ns struct {
v1alpha1.ExecutableNodeStatus
}

func TestNewFailureNodeLookup(t *testing.T) {
nl := nl{}
en := en{}
ns := ns{}
nodeLoopUp := NewFailureNodeLookup(nl, en, ns)
assert.NotNil(t, nl)
typed := nodeLoopUp.(FailureNodeLookup)
assert.Equal(t, nl, typed.NodeLookup)
assert.Equal(t, en, typed.FailureNode)
assert.Equal(t, ns, typed.FailureNodeStatus)
}

func TestNewTestFailureNodeLookup(t *testing.T) {
n := &mocks.ExecutableNode{}
ns := &mocks.ExecutableNodeStatus{}
failureNodeID := "fn1"
nl := NewTestNodeLookup(
map[string]v1alpha1.ExecutableNode{v1alpha1.StartNodeID: n, failureNodeID: n},
map[string]v1alpha1.ExecutableNodeStatus{v1alpha1.StartNodeID: ns, failureNodeID: ns},
)

assert.NotNil(t, nl)

failureNodeLookup := NewFailureNodeLookup(nl, n, ns)
r, ok := failureNodeLookup.GetNode(v1alpha1.StartNodeID)
assert.True(t, ok)
assert.Equal(t, n, r)
assert.Equal(t, ns, nl.GetNodeExecutionStatus(context.TODO(), v1alpha1.StartNodeID))

r, ok = failureNodeLookup.GetNode(failureNodeID)
assert.True(t, ok)
assert.Equal(t, n, r)
assert.Equal(t, ns, nl.GetNodeExecutionStatus(context.TODO(), failureNodeID))
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,36 @@ func TestGetSubWorkflow(t *testing.T) {
assert.Equal(t, swf, w)
})

t.Run("subworkflow with failure node", func(t *testing.T) {

wfNode := &coreMocks.ExecutableWorkflowNode{}
x := "x"
wfNode.OnGetSubWorkflowRef().Return(&x)

node := &coreMocks.ExecutableNode{}
node.OnGetWorkflowNode().Return(wfNode)

ectx := &execMocks.ExecutionContext{}

wfFailureNode := &coreMocks.ExecutableWorkflowNode{}
y := "y"
wfFailureNode.OnGetSubWorkflowRef().Return(&y)
failureNode := &coreMocks.ExecutableNode{}
failureNode.OnGetWorkflowNode().Return(wfFailureNode)

swf := &coreMocks.ExecutableSubWorkflow{}
swf.OnGetOnFailureNode().Return(failureNode)
ectx.OnFindSubWorkflow("x").Return(swf)

nCtx := &mocks.NodeExecutionContext{}
nCtx.OnNode().Return(node)
nCtx.OnExecutionContext().Return(ectx)

w, err := GetSubWorkflow(ctx, nCtx)
assert.NoError(t, err)
assert.Equal(t, swf, w)
})

t.Run("missing-subworkflow", func(t *testing.T) {

wfNode := &coreMocks.ExecutableWorkflowNode{}
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ func (c *workflowExecutor) handleFailingWorkflow(ctx context.Context, w *v1alpha
return StatusFailing(execErr), err
}

errorNode := w.GetOnFailureNode()
if errorNode != nil {
failureNode := w.GetOnFailureNode()
if failureNode != nil {
return StatusFailureNode(execErr), nil
}

Expand Down
4 changes: 3 additions & 1 deletion flytepropeller/pkg/controller/workflow/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) {
if assert.NoError(t, json.Unmarshal(wJSON, w)) {
// For benchmark workflow, we will run into the first failure on round 6

roundsToFail := 7
roundsToFail := 8
for i := 0; i < roundsToFail; i++ {
err := executor.HandleFlyteWorkflow(ctx, w)
assert.Nil(t, err, "Round [%v]", i)
Expand All @@ -534,6 +534,8 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Failing(t *testing.T) {

if i == roundsToFail-1 {
assert.Equal(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase)
} else if i == roundsToFail-2 {
assert.Equal(t, v1alpha1.WorkflowPhaseHandlingFailureNode, w.Status.Phase)
} else {
assert.NotEqual(t, v1alpha1.WorkflowPhaseFailed, w.Status.Phase, "For Round [%v] got phase [%v]", i, w.Status.Phase.String())
}
Expand Down
50 changes: 50 additions & 0 deletions flytepropeller/pkg/controller/workflow/testdata/benchmark_wf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,22 @@ spec:
status:
phase: 0
task: sum-non-none
onFailure:
id: en0
inputBindings:
- binding:
promise:
nodeId: start-node
var: triggered_date
var: triggered_date
kind: task
name: delete-cluster
resources:
requests:
cpu: "2"
memory: 2Gi
status:
phase: 0
status:
phase: 0
tasks:
Expand Down Expand Up @@ -290,6 +306,40 @@ tasks:
version: 1.19.0b10
timeout: 0s
type: "7"
delete-cluster:
container:
args:
- --task-module=flytekit.examples.tasks
- --task-name=print_every_time
- --inputs={{$input}}
- --output-prefix={{$output}}
command:
- flyte-python-entrypoint
image: myflytecontainer:abc123
resources:
requests:
- name: 1
value: "2.000"
- name: 3
value: 2048Mi
- name: 2
value: "0.000"
id:
name: delete-cluster
interface:
inputs:
variables:
date_triggered:
type:
simple: DATETIME
outputs:
variables: { }
metadata:
runtime:
type: 1
version: 1.19.0b10
timeout: 0s
type: "7"
sum-and-print:
container:
args:
Expand Down

0 comments on commit 040745e

Please sign in to comment.