Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-v0.56.x] Run finally pipeline even if task is failed at the validation #8370

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -837,14 +837,22 @@ func (c *Reconciler) runNextSchedulableTask(ctx context.Context, pr *v1.Pipeline
return controller.NewPermanentError(err)
}

// Check for Missing Result References
err = resources.CheckMissingResultReferences(pipelineRunFacts.State, nextRpts)
if err != nil {
logger.Infof("Failed to resolve task result reference for %q with error %v", pr.Name, err)
pr.Status.MarkFailed(v1.PipelineRunReasonInvalidTaskResultReference.String(), err.Error())
return controller.NewPermanentError(err)
for _, rpt := range nextRpts {
// Check for Missing Result References
// if error found, present rpt will be
// added to the validationFailedTask list
err := resources.CheckMissingResultReferences(pipelineRunFacts.State, rpt)
if err != nil {
logger.Infof("Failed to resolve task result reference for %q with error %v", pr.Name, err)
// If there is an error encountered, no new task
// will be scheduled, hence nextRpts should be empty
// If finally tasks are found, then those tasks will
// be added to the nextRpts
nextRpts = nil
logger.Infof("Adding the task %q to the validation failed list", rpt.ResolvedTask)
pipelineRunFacts.ValidationFailedTask = append(pipelineRunFacts.ValidationFailedTask, rpt)
}
}

// GetFinalTasks only returns final tasks when a DAG is complete
fNextRpts := pipelineRunFacts.GetFinalTasks()
if len(fNextRpts) != 0 {
Expand Down
10 changes: 5 additions & 5 deletions pkg/reconciler/pipelinerun/pipelinerun_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1266,8 +1266,8 @@ status:
image: busybox
script: 'exit 0'
conditions:
- message: "Invalid task result reference: Could not find result with name result2 for task task1"
reason: InvalidTaskResultReference
- message: "Tasks Completed: 2 (Failed: 1, Cancelled 0), Skipped: 0"
reason: Failed
status: "False"
type: Succeeded
childReferences:
Expand All @@ -1283,15 +1283,15 @@ status:
prt := newPipelineRunTest(t, d)
defer prt.Cancel()

reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-missing-results", []string{}, true)
reconciledRun, clients := prt.reconcileRun("foo", "test-pipeline-missing-results", []string{}, false)
if reconciledRun.Status.CompletionTime == nil {
t.Errorf("Expected a CompletionTime on invalid PipelineRun but was nil")
}

// The PipelineRun should be marked as failed due to InvalidTaskResultReference.
// The PipelineRun should be marked as failed
if d := cmp.Diff(expectedPipelineRun, reconciledRun, ignoreResourceVersion, ignoreLastTransitionTime, ignoreTypeMeta,
ignoreStartTime, ignoreCompletionTime, ignoreProvenance); d != "" {
t.Errorf("Expected to see PipelineRun run marked as failed with the reason: InvalidTaskResultReference. Diff %s", diff.PrintWantGot(d))
t.Errorf("Expected to see PipelineRun run marked as failed. Diff %s", diff.PrintWantGot(d))
}

// Check that the expected TaskRun was created
Expand Down
60 changes: 34 additions & 26 deletions pkg/reconciler/pipelinerun/resources/pipelinerunresolution.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (t *ResolvedPipelineTask) EvaluateCEL() error {

// isDone returns true only if the task is skipped, succeeded or failed
func (t ResolvedPipelineTask) isDone(facts *PipelineRunFacts) bool {
return t.Skip(facts).IsSkipped || t.isSuccessful() || t.isFailure()
return t.Skip(facts).IsSkipped || t.isSuccessful() || t.isFailure() || t.isValidationFailed(facts.ValidationFailedTask)
}

// IsRunning returns true only if the task is neither succeeded, cancelled nor failed
Expand Down Expand Up @@ -185,6 +185,16 @@ func (t ResolvedPipelineTask) isFailure() bool {
return t.haveAnyTaskRunsFailed() && isDone
}

// isValidationFailed return true if the task is failed at the validation step
func (t ResolvedPipelineTask) isValidationFailed(ftasks []*ResolvedPipelineTask) bool {
for _, ftask := range ftasks {
if ftask.ResolvedTask == t.ResolvedTask {
return true
}
}
return false
}

// isCancelledForTimeOut returns true only if the run is cancelled due to PipelineRun-controlled timeout
// If the PipelineTask has a Matrix, isCancelled returns true if any run is cancelled due to PipelineRun-controlled timeout and all other runs are done.
func (t ResolvedPipelineTask) isCancelledForTimeOut() bool {
Expand Down Expand Up @@ -306,7 +316,7 @@ func (t *ResolvedPipelineTask) skip(facts *PipelineRunFacts) TaskSkipStatus {
var skippingReason v1.SkippingReason

switch {
case facts.isFinalTask(t.PipelineTask.Name) || t.isScheduled():
case facts.isFinalTask(t.PipelineTask.Name) || t.isScheduled() || t.isValidationFailed(facts.ValidationFailedTask):
skippingReason = v1.None
case facts.IsStopping():
skippingReason = v1.StoppingSkip
Expand Down Expand Up @@ -777,31 +787,29 @@ func isCustomRunCancelledByPipelineRunTimeout(cr *v1beta1.CustomRun) bool {
// CheckMissingResultReferences returns an error if it is missing any result references.
// Missing result references can occur if task fails to produce a result but has
// OnError: continue (ie TestMissingResultWhenStepErrorIsIgnored)
func CheckMissingResultReferences(pipelineRunState PipelineRunState, targets PipelineRunState) error {
for _, target := range targets {
for _, resultRef := range v1.PipelineTaskResultRefs(target.PipelineTask) {
referencedPipelineTask, ok := pipelineRunState.ToMap()[resultRef.PipelineTask]
if !ok {
return fmt.Errorf("Result reference error: Could not find ref \"%s\" in internal pipelineRunState", resultRef.PipelineTask)
func CheckMissingResultReferences(pipelineRunState PipelineRunState, target *ResolvedPipelineTask) error {
for _, resultRef := range v1.PipelineTaskResultRefs(target.PipelineTask) {
referencedPipelineTask, ok := pipelineRunState.ToMap()[resultRef.PipelineTask]
if !ok {
return fmt.Errorf("Result reference error: Could not find ref \"%s\" in internal pipelineRunState", resultRef.PipelineTask)
}
if referencedPipelineTask.IsCustomTask() {
if len(referencedPipelineTask.CustomRuns) == 0 {
return fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length CustomRuns", resultRef.PipelineTask)
}
if referencedPipelineTask.IsCustomTask() {
if len(referencedPipelineTask.CustomRuns) == 0 {
return fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length CustomRuns", resultRef.PipelineTask)
}
customRun := referencedPipelineTask.CustomRuns[0]
_, err := findRunResultForParam(customRun, resultRef)
if err != nil {
return err
}
} else {
if len(referencedPipelineTask.TaskRuns) == 0 {
return fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length TaskRuns", resultRef.PipelineTask)
}
taskRun := referencedPipelineTask.TaskRuns[0]
_, err := findTaskResultForParam(taskRun, resultRef)
if err != nil {
return err
}
customRun := referencedPipelineTask.CustomRuns[0]
_, err := findRunResultForParam(customRun, resultRef)
if err != nil {
return err
}
} else {
if len(referencedPipelineTask.TaskRuns) == 0 {
return fmt.Errorf("Result reference error: Internal result ref \"%s\" has zero-length TaskRuns", resultRef.PipelineTask)
}
taskRun := referencedPipelineTask.TaskRuns[0]
_, err := findTaskResultForParam(taskRun, resultRef)
if err != nil {
return err
}
}
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/reconciler/pipelinerun/resources/pipelinerunstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ type PipelineRunFacts struct {
// The skip data is sensitive to changes in the state. The ResetSkippedCache method
// can be used to clean the cache and force re-computation when needed.
SkipCache map[string]TaskSkipStatus

// ValidationFailedTask are the tasks for which taskrun is not created as they
// never got added to the execution i.e. they failed in the validation step. One of
// the case of failing at the validation is during CheckMissingResultReferences method
// Tasks in ValidationFailedTask is added in method runNextSchedulableTask
ValidationFailedTask []*ResolvedPipelineTask
}

// PipelineRunTimeoutsState records information about start times and timeouts for the PipelineRun, so that the PipelineRunFacts
Expand Down Expand Up @@ -284,7 +290,7 @@ func (state PipelineRunState) getNextTasks(candidateTasks sets.String) []*Resolv
func (facts *PipelineRunFacts) IsStopping() bool {
for _, t := range facts.State {
if facts.isDAGTask(t.PipelineTask.Name) {
if t.isFailure() && t.PipelineTask.OnError != v1.PipelineTaskContinue {
if (t.isFailure() || t.isValidationFailed(facts.ValidationFailedTask)) && t.PipelineTask.OnError != v1.PipelineTaskContinue {
return true
}
}
Expand Down Expand Up @@ -657,6 +663,8 @@ func (facts *PipelineRunFacts) getPipelineTasksCount() pipelineRunStatusCount {
} else {
s.Failed++
}
case t.isValidationFailed(facts.ValidationFailedTask):
s.Failed++
// increment skipped and skipped due to timeout counters since the task was skipped due to the pipeline, tasks, or finally timeout being reached before the task was launched
case t.Skip(facts).SkippingReason == v1.PipelineTimedOutSkip ||
t.Skip(facts).SkippingReason == v1.TasksTimedOutSkip ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,13 @@ func TestCheckMissingResultReferences(t *testing.T) {
wantErr: "Result reference error: Internal result ref \"lTask\" has zero-length TaskRuns",
}} {
t.Run(tt.name, func(t *testing.T) {
err := CheckMissingResultReferences(tt.pipelineRunState, tt.targets)
var err error
for _, target := range tt.targets {
tmpErr := CheckMissingResultReferences(tt.pipelineRunState, target)
if tmpErr != nil {
err = tmpErr
}
}
if (err != nil) && err.Error() != tt.wantErr {
t.Errorf("CheckMissingResultReferences() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
122 changes: 122 additions & 0 deletions test/pipelinefinally_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,98 @@ spec:
}
}

func TestPipelineLevelFinally_OneDAGNotProducingResult_SecondDAGUsingResult_Failure(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c, namespace := setup(ctx, t)
knativetest.CleanupOnInterrupt(func() { tearDown(ctx, t, c, namespace) }, t.Logf)
defer tearDown(ctx, t, c, namespace)

successTask := getSuccessTask(t, namespace)
successTask.Spec.Results = append(successTask.Spec.Results, v1.TaskResult{
Name: "result",
})
if _, err := c.V1TaskClient.Create(ctx, successTask, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create final Task: %s", err)
}

taskClaimingResultProductionButNotProducing := getSuccessTaskClaimProducingResultButNotProducing(t, namespace)
if _, err := c.V1TaskClient.Create(ctx, taskClaimingResultProductionButNotProducing, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task claiming result production but not producing task results: %s", err)
}

taskConsumingResultInParam := getTaskConsumingResults(t, namespace, "dagtask1-result")
if _, err := c.V1TaskClient.Create(ctx, taskConsumingResultInParam, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Task consuming task results in param: %s", err)
}

pipeline := parse.MustParseV1Pipeline(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
finally:
- name: finaltask1
taskRef:
name: %s
tasks:
- name: dagtask1
taskRef:
name: %s
- name: dagtaskconsumingdagtask1
params:
- name: dagtask1-result
value: $(tasks.dagtask1.results.result)
taskRef:
name: %s
`, helpers.ObjectNameForTest(t), namespace, successTask.Name, taskClaimingResultProductionButNotProducing.Name, taskConsumingResultInParam.Name))
if _, err := c.V1PipelineClient.Create(ctx, pipeline, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline: %s", err)
}

pipelineRun := getPipelineRun(t, namespace, pipeline.Name)
if _, err := c.V1PipelineRunClient.Create(ctx, pipelineRun, metav1.CreateOptions{}); err != nil {
t.Fatalf("Failed to create Pipeline Run `%s`: %s", pipelineRun.Name, err)
}

if err := WaitForPipelineRunState(ctx, c, pipelineRun.Name, timeout, PipelineRunFailed(pipelineRun.Name), "PipelineRunFailed", v1Version); err != nil {
t.Fatalf("Waiting for PipelineRun %s to fail: %v", pipelineRun.Name, err)
}

taskrunList, err := c.V1TaskRunClient.List(ctx, metav1.ListOptions{LabelSelector: "tekton.dev/pipelineRun=" + pipelineRun.Name})
if err != nil {
t.Fatalf("Error listing TaskRuns for PipelineRun %s: %s", pipelineRun.Name, err)
}

// expecting taskRuns for finaltask1 and dagtask
expectedTaskRunsCount := 2
if len(taskrunList.Items) != expectedTaskRunsCount {
var s []string
for _, n := range taskrunList.Items {
s = append(s, n.Labels["tekton.dev/pipelineTask"])
}
t.Fatalf("Error retrieving TaskRuns for PipelineRun %s. Expected %d taskRuns and found %d taskRuns for: %s",
pipelineRun.Name, expectedTaskRunsCount, len(taskrunList.Items), strings.Join(s, ", "))
}

// verify dag task failed, parallel dag task succeeded, and final task succeeded
for _, taskrunItem := range taskrunList.Items {
switch n := taskrunItem.Labels["tekton.dev/pipelineTask"]; {
case n == "dagtask1":
if err := WaitForTaskRunState(ctx, c, taskrunItem.Name, TaskRunSucceed(taskrunItem.Name), "TaskRunSuccess", v1Version); err != nil {
t.Errorf("Error waiting for TaskRun to succeed: %v", err)
}
case n == "finaltask1":
if err := WaitForTaskRunState(ctx, c, taskrunItem.Name, TaskRunSucceed(taskrunItem.Name), "TaskRunSuccess", v1Version); err != nil {
t.Errorf("Error waiting for TaskRun to succeed: %v", err)
}
default:
t.Fatalf("Found unexpected taskRun %s", n)
}
}
}

func getSuccessTask(t *testing.T, namespace string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
Expand Down Expand Up @@ -760,6 +852,36 @@ spec:
`, helpers.ObjectNameForTest(t), namespace))
}

func getSuccessTaskClaimProducingResultButNotProducing(t *testing.T, namespace string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
steps:
- image: mirror.gcr.io/alpine
script: echo -n "Hello"
results:
- name: result
`, helpers.ObjectNameForTest(t), namespace))
}

func getTaskConsumingResults(t *testing.T, namespace string, paramName string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
metadata:
name: %s
namespace: %s
spec:
steps:
- image: mirror.gcr.io/alpine
script: 'echo "Content of param: $(params.%s)" '
params:
- name: %s
`, helpers.ObjectNameForTest(t), namespace, paramName, paramName))
}

func getDelaySuccessTaskProducingResults(t *testing.T, namespace string) *v1.Task {
t.Helper()
return parse.MustParseV1Task(t, fmt.Sprintf(`
Expand Down