Skip to content

Commit

Permalink
Regression: fix results with out of order tasks
Browse files Browse the repository at this point in the history
The pipeline run reconciler builds a pipeline run state on every
run, which resolves task references, expands result and processes
matrix fan outs.

The current process is incremental in a single loop, where each
new PipelineTask resolution depends on the state of PipelineTasks
resolved before. This is problematic because tasks are not
necessarily defined in the pipeline in order of execution (which
is undefined, given that pipelines are DAGs).

Since this PR is a fix to a regression, it aims to be as minimal
as possible. The smallest solution available is to implement some
sorting in the list of tasks, so that the incremental state
can work correctly.

This PR splits the process into two runs, one for tasks that have
been already started (and possibly completed), and a second one
that includes all remaining tasks. The first group of task does
not need matrix fan outs (they have already been processed) or
result resolution, so its state can be safely build incrementally.

The second group is executed starting from the state of the second
group. Any task that is a candidate for execution in this this
reconcile cycle must have its results resolved through the state
of the first group.

Testing with the current code arrangement is a bit challenging,
as we ignore result resolution errors in the code, which is ok
only in some cases:
- result resolution due to task not found or result not defined
  is permanent and should not be ignored
- result resolution due to a result not being available yet is
  ephemeral (possibly) and should not cause a failure

Currently one function checks for all these conditions and
returns one error, so it's not possible to safely distinguish
them. This will require some refactoring to be fixed in a follow
up patch.

For now, a reconcile unit test (TBD) may be able to test the fix.

Fixes: #7103

Signed-off-by: Andrea Frittoli <[email protected]>
  • Loading branch information
afrittoli committed Oct 3, 2023
1 parent e8b85e2 commit a801d03
Showing 1 changed file with 42 additions and 3 deletions.
45 changes: 42 additions & 3 deletions pkg/reconciler/pipelinerun/pipelinerun.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ func (c *Reconciler) resolvePipelineState(
ctx context.Context,
tasks []v1.PipelineTask,
pipelineMeta *metav1.ObjectMeta,
pr *v1.PipelineRun) (resources.PipelineRunState, error) {
pr *v1.PipelineRun,
pst resources.PipelineRunState) (resources.PipelineRunState, error) {
ctx, span := c.tracerProvider.Tracer(TracerName).Start(ctx, "resolvePipelineState")
defer span.End()
pst := resources.PipelineRunState{}
// Resolve each task individually because they each could have a different reference context (remote or local).
for _, task := range tasks {
// We need the TaskRun name to ensure that we don't perform an additional remote resolution request for a PipelineTask
Expand Down Expand Up @@ -536,7 +536,46 @@ func (c *Reconciler) reconcile(ctx context.Context, pr *v1.PipelineRun, getPipel
if len(pipelineSpec.Finally) > 0 {
tasks = append(tasks, pipelineSpec.Finally...)
}
pipelineRunState, err := c.resolvePipelineState(ctx, tasks, pipelineMeta.ObjectMeta, pr)

// We spit tasks in two lists:
// - those with a completed (Task|Custom)Run reference (i.e. those that finished running)
// - those without a (Task|Custom)Run reference
// We resolve the status for the former first, to collect all results available at this stage
// We know that tasks in progress or completed have had their fan-out alteady calculated so
// they can be safely processed in the first iteration. The underlying assumption is that if
// a PipelineTask has at least one TaskRun associated, then all its TaskRuns have been
// created already.
// The second group takes as input the partial state built in the first iteration and finally
// the two results are collated
ranOrRunningTaskNames := sets.Set[string]{}
ranOrRunningTasks := []v1.PipelineTask{}
notStartedTasks := []v1.PipelineTask{}

for _, child := range pr.Status.ChildReferences {
ranOrRunningTaskNames.Insert(child.PipelineTaskName)
}
for _, task := range tasks {
if ranOrRunningTaskNames.Has(task.Name) {
ranOrRunningTasks = append(ranOrRunningTasks, task)
} else {
notStartedTasks = append(notStartedTasks, task)
}
}
// First iteration
pst := resources.PipelineRunState{}
pipelineRunState, err := c.resolvePipelineState(ctx, ranOrRunningTasks, pipelineMeta.ObjectMeta, pr, pst)
switch {
case errors.Is(err, remote.ErrRequestInProgress):
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
pr.Status.MarkRunning(v1.TaskRunReasonResolvingTaskRef, message)
return nil
case err != nil:
return err
default:
}

// Second iteration
pipelineRunState, err = c.resolvePipelineState(ctx, notStartedTasks, pipelineMeta.ObjectMeta, pr, pipelineRunState)
switch {
case errors.Is(err, remote.ErrRequestInProgress):
message := fmt.Sprintf("PipelineRun %s/%s awaiting remote resource", pr.Namespace, pr.Name)
Expand Down

0 comments on commit a801d03

Please sign in to comment.