Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: controller: ensures workflow reconciling task result properly when failing to received timely updates from api server #14026

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1986,6 +1986,15 @@ func (ws *WorkflowStatus) IsTaskResultIncomplete(name string) bool {
return false // workflows from older versions do not have this status, so assume completed if this is missing
}

func (ws *WorkflowStatus) IsTaskResultInited(name string) bool {
if ws.TaskResultsCompletionStatus == nil {
return false
}

_, found := ws.TaskResultsCompletionStatus[name]
return found
}

func (ws *WorkflowStatus) IsOffloadNodeStatus() bool {
return ws.OffloadNodeStatusVersion != ""
}
Expand Down
18 changes: 18 additions & 0 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,24 @@ func (woc *wfOperationCtx) assessNodeStatus(ctx context.Context, pod *apiv1.Pod,
new.PodIP = pod.Status.PodIP
}

if resultName := woc.nodeID(pod); new.Phase == wfv1.NodeSucceeded &&
tmpl.HasOutputs() && !woc.wf.Status.IsTaskResultInited(resultName) {
// error scenario: a pod for a step in a workflow has completed, and its task
// result are properly created and finalized by its wait container (judging from
// the exit status of the wait container), however, the task result informer in
// the controller leader has not received any updates about it (due to overloaded
// api server or etcd).
//
// the change is to forcefully mark the workflow having incomplete TaskResult in
// assessNodeStatus.
//
// this fix doesn't handle the case when a pod failed, there are too many
// potentially failure scenarios (like the wait container might not be able to
// insert a task result). plus, a retry is probably needed when there are
// failures. the loss is probably not as great as a successful one.
Comment on lines +1452 to +1455
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have retry for transient errors in wait container to handle this

func (we *WorkflowExecutor) reportResult(ctx context.Context, result wfv1.NodeResult) error {
return retryutil.OnError(wait.Backoff{
Duration: time.Second,
Factor: 2,
Jitter: 0.1,
Steps: 5,
Cap: 30 * time.Second,

woc.wf.Status.MarkTaskResultIncomplete(resultName)
}

new.HostNodeName = pod.Spec.NodeName

if !new.Progress.IsValid() {
Expand Down
22 changes: 19 additions & 3 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10855,8 +10855,20 @@ spec:
i = random.randint(1, 100)
print(i)`

// TestWorkflowNeedReconcile test whether a workflow need reconcile taskresults.
// testWorkflowNeedReconcile test whether a workflow need reconcile taskresults.
func TestWorkflowNeedReconcile(t *testing.T) {
t.Run("hasIncompleteTaskResult", func(t *testing.T) {
hasIncompleteTaskResult := true
testWorkflowNeedReconcileHelper(t, hasIncompleteTaskResult)
})

t.Run("hasNoTaskResult", func(t *testing.T) {
hasIncompleteTaskResult := false
testWorkflowNeedReconcileHelper(t, hasIncompleteTaskResult)
})
}

func testWorkflowNeedReconcileHelper(t *testing.T, hasIncompleteTaskResult bool) {
cancel, controller := newController()
defer cancel()
ctx := context.Background()
Expand All @@ -10877,9 +10889,13 @@ func TestWorkflowNeedReconcile(t *testing.T) {
wf, err = wfcset.Get(ctx, wf.ObjectMeta.Name, metav1.GetOptions{})
require.NoError(t, err)
woc = newWorkflowOperationCtx(wf, controller)
for _, node := range woc.wf.Status.Nodes {
woc.wf.Status.MarkTaskResultIncomplete(node.ID)

if hasIncompleteTaskResult {
for _, node := range woc.wf.Status.Nodes {
woc.wf.Status.MarkTaskResultIncomplete(node.ID)
}
}

err, podReconciliationCompleted := woc.podReconciliation(ctx)
require.NoError(t, err)
assert.False(t, podReconciliationCompleted)
Expand Down
Loading