From ed4d57872f905c871da642f94e7dad3e44e6d503 Mon Sep 17 00:00:00 2001 From: Chitrang Patel Date: Thu, 23 Nov 2023 11:20:10 -0500 Subject: [PATCH] TEP-0142: Surface step results via sidecar logs Prior to this, we enabled surfacing step results via termination message. This PR does the same thing via sidecar logs. --- cmd/sidecarlogresults/main.go | 9 +- .../{no-ci => alpha}/stepaction-results.yaml | 2 + .../sidecarlogresults/sidecarlogresults.go | 86 ++++++++++++-- pkg/pod/pod.go | 38 ++++-- pkg/pod/status.go | 109 +++++++++++++----- 5 files changed, 194 insertions(+), 50 deletions(-) rename examples/v1/taskruns/{no-ci => alpha}/stepaction-results.yaml (83%) diff --git a/cmd/sidecarlogresults/main.go b/cmd/sidecarlogresults/main.go index 70d8a7e9d76..216d71e80b1 100644 --- a/cmd/sidecarlogresults/main.go +++ b/cmd/sidecarlogresults/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "encoding/json" "flag" "log" "os" @@ -30,14 +31,20 @@ import ( func main() { var resultsDir string var resultNames string + var stepResultsStr string flag.StringVar(&resultsDir, "results-dir", pipeline.DefaultResultPath, "Path to the results directory. Default is /tekton/results") flag.StringVar(&resultNames, "result-names", "", "comma separated result names to expect from the steps running in the pod. eg. foo,bar,baz") + flag.StringVar(&stepResultsStr, "step-results", "", "json containing a map of step Name as key and list of result Names. eg. {\"stepName\":[\"foo\",\"bar\",\"baz\"]}") flag.Parse() if resultNames == "" { log.Fatal("result-names were not provided") } expectedResults := strings.Split(resultNames, ",") - err := sidecarlogresults.LookForResults(os.Stdout, pod.RunDir, resultsDir, expectedResults) + expectedStepResults := map[string][]string{} + if err := json.Unmarshal([]byte(stepResultsStr), &expectedStepResults); err != nil { + log.Fatal(err) + } + err := sidecarlogresults.LookForResults(os.Stdout, pod.RunDir, resultsDir, expectedResults, expectedStepResults) if err != nil { log.Fatal(err) } diff --git a/examples/v1/taskruns/no-ci/stepaction-results.yaml b/examples/v1/taskruns/alpha/stepaction-results.yaml similarity index 83% rename from examples/v1/taskruns/no-ci/stepaction-results.yaml rename to examples/v1/taskruns/alpha/stepaction-results.yaml index f4a285d83db..7ea048830a7 100644 --- a/examples/v1/taskruns/no-ci/stepaction-results.yaml +++ b/examples/v1/taskruns/alpha/stepaction-results.yaml @@ -6,8 +6,10 @@ spec: image: alpine results: - name: result1 + - name: result2 script: | echo "I am a Step Action!!!" >> $(step.results.result1.path) + echo "I am a hidden step action!!!" >> $(step.results.result2.path) --- apiVersion: tekton.dev/v1 kind: TaskRun diff --git a/internal/sidecarlogresults/sidecarlogresults.go b/internal/sidecarlogresults/sidecarlogresults.go index c290aaadd48..d92c9101e00 100644 --- a/internal/sidecarlogresults/sidecarlogresults.go +++ b/internal/sidecarlogresults/sidecarlogresults.go @@ -25,8 +25,10 @@ import ( "io" "os" "path/filepath" + "strings" "github.com/tektoncd/pipeline/pkg/apis/config" + "github.com/tektoncd/pipeline/pkg/apis/pipeline" "github.com/tektoncd/pipeline/pkg/result" "golang.org/x/sync/errgroup" corev1 "k8s.io/api/core/v1" @@ -36,10 +38,19 @@ import ( // ErrSizeExceeded indicates that the result exceeded its maximum allowed size var ErrSizeExceeded = errors.New("results size exceeds configured limit") +type SidecarLogResultType string + +const ( + taskResultType SidecarLogResultType = "task" + stepResultType SidecarLogResultType = "step" + sidecarResultNameSeparator string = "." +) + // SidecarLogResult holds fields for storing extracted results type SidecarLogResult struct { Name string Value string + Type SidecarLogResultType } func fileExists(filename string) (bool, error) { @@ -89,10 +100,37 @@ func waitForStepsToFinish(runDir string) error { return nil } +func createSidecarResultName(stepName, resultName string) string { + return fmt.Sprintf("%s%s%s", stepName, sidecarResultNameSeparator, resultName) +} + +func ExtractStepAndResultFromSidecarResultName(sidecarResultName string) (string, string) { + splitString := strings.SplitN(sidecarResultName, sidecarResultNameSeparator, 2) + return splitString[0], splitString[1] +} + +func readResults(resultsDir, resultFile, stepName string, resultType SidecarLogResultType) (SidecarLogResult, error) { + value, err := os.ReadFile(filepath.Join(resultsDir, resultFile)) + if os.IsNotExist(err) { + return SidecarLogResult{}, nil + } else if err != nil { + return SidecarLogResult{}, fmt.Errorf("error reading the results file %w", err) + } + resultName := resultFile + if resultType == stepResultType { + resultName = createSidecarResultName(stepName, resultFile) + } + return SidecarLogResult{ + Name: resultName, + Value: string(value), + Type: resultType, + }, nil +} + // LookForResults waits for results to be written out by the steps // in their results path and prints them in a structured way to its // stdout so that the reconciler can parse those logs. -func LookForResults(w io.Writer, runDir string, resultsDir string, resultNames []string) error { +func LookForResults(w io.Writer, runDir string, resultsDir string, resultNames []string, stepResults map[string][]string) error { if err := waitForStepsToFinish(runDir); err != nil { return fmt.Errorf("error while waiting for the steps to finish %w", err) } @@ -102,20 +140,39 @@ func LookForResults(w io.Writer, runDir string, resultsDir string, resultNames [ resultFile := resultFile g.Go(func() error { - value, err := os.ReadFile(filepath.Join(resultsDir, resultFile)) - if os.IsNotExist(err) { - return nil - } else if err != nil { - return fmt.Errorf("error reading the results file %w", err) + newResult, err := readResults(resultsDir, resultFile, "", taskResultType) + if err != nil { + return err } - newResult := SidecarLogResult{ - Name: resultFile, - Value: string(value), + if newResult.Name == "" { + return nil } results <- newResult return nil }) } + + for sName, sresults := range stepResults { + sresults := sresults + sName := sName + for _, resultName := range sresults { + resultName := resultName + stepResultsDir := filepath.Join(pipeline.StepsDir, sName, "results") + + g.Go(func() error { + newResult, err := readResults(stepResultsDir, resultName, sName, stepResultType) + if err != nil { + return err + } + if newResult.Name == "" { + return nil + } + results <- newResult + return nil + }) + } + } + channelGroup := new(errgroup.Group) channelGroup.Go(func() error { if err := g.Wait(); err != nil { @@ -183,10 +240,19 @@ func parseResults(resultBytes []byte, maxResultLimit int) (result.RunResult, err if len(resultBytes) > maxResultLimit { return runResult, fmt.Errorf("invalid result \"%s\": %w of %d", res.Name, ErrSizeExceeded, maxResultLimit) } + var resultType result.ResultType + switch res.Type { + case taskResultType: + resultType = result.TaskRunResultType + case stepResultType: + resultType = result.StepResultType + default: + return result.RunResult{}, fmt.Errorf("Invalid sidecar result type %v. Must be %v or %v", res.Type, taskResultType, stepResultType) + } runResult = result.RunResult{ Key: res.Name, Value: res.Value, - ResultType: result.TaskRunResultType, + ResultType: resultType, } return runResult, nil } diff --git a/pkg/pod/pod.go b/pkg/pod/pod.go index 04a0a2dec0e..3cd60308d02 100644 --- a/pkg/pod/pod.go +++ b/pkg/pod/pod.go @@ -18,6 +18,7 @@ package pod import ( "context" + "encoding/json" "fmt" "log" "math" @@ -190,7 +191,10 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1.TaskRun, taskSpec v1.Ta windows := usesWindows(taskRun) if sidecarLogsResultsEnabled && taskSpec.Results != nil { // create a results sidecar - resultsSidecar := createResultsSidecar(taskSpec, b.Images.SidecarLogResultsImage, setSecurityContext, windows) + resultsSidecar, err := createResultsSidecar(taskSpec, b.Images.SidecarLogResultsImage, setSecurityContext, windows) + if err != nil { + return nil, err + } taskSpec.Sidecars = append(taskSpec.Sidecars, resultsSidecar) commonExtraEntrypointArgs = append(commonExtraEntrypointArgs, "-result_from", config.ResultExtractionMethodSidecarLogs) } @@ -568,26 +572,46 @@ func entrypointInitContainer(image string, steps []v1.Step, setSecurityContext, // based on the spec of the Task, the image that should run in the results sidecar, // whether it will run on a windows node, and whether the sidecar should include a security context // that will allow it to run in namespaces with "restricted" pod security admission. -func createResultsSidecar(taskSpec v1.TaskSpec, image string, setSecurityContext, windows bool) v1.Sidecar { +func createResultsSidecar(taskSpec v1.TaskSpec, image string, setSecurityContext, windows bool) (v1.Sidecar, error) { names := make([]string, 0, len(taskSpec.Results)) for _, r := range taskSpec.Results { names = append(names, r.Name) } - securityContext := linuxSecurityContext - if windows { - securityContext = windowsSecurityContext - } resultsStr := strings.Join(names, ",") command := []string{"/ko-app/sidecarlogresults", "-results-dir", pipeline.DefaultResultPath, "-result-names", resultsStr} + + // create a map of container Name to step results + stepResults := map[string][]string{} + for i, s := range taskSpec.Steps { + if len(s.Results) > 0 { + stepName := StepName(s.Name, i) + stepResults[stepName] = make([]string, 0, len(s.Results)) + for _, r := range s.Results { + stepResults[stepName] = append(stepResults[stepName], r.Name) + } + } + } + + stepResultsBytes, err := json.Marshal(stepResults) + if err != nil { + return v1.Sidecar{}, err + } + if len(stepResultsBytes) > 0 { + command = append(command, "-step-results", string(stepResultsBytes)) + } sidecar := v1.Sidecar{ Name: pipeline.ReservedResultsSidecarName, Image: image, Command: command, } + securityContext := linuxSecurityContext + if windows { + securityContext = windowsSecurityContext + } if setSecurityContext { sidecar.SecurityContext = securityContext } - return sidecar + return sidecar, nil } // usesWindows returns true if the TaskRun will run on a windows node, diff --git a/pkg/pod/status.go b/pkg/pod/status.go index 848c72ba05e..16994024a8e 100644 --- a/pkg/pod/status.go +++ b/pkg/pod/status.go @@ -163,6 +163,46 @@ func MakeTaskRunStatus(ctx context.Context, logger *zap.SugaredLogger, tr v1.Tas return *trs, merr.ErrorOrNil() } +func createTaskResultsFromStepResults(stepRunRes []v1.TaskRunStepResult, neededStepResults map[string]string) []v1.TaskRunResult { + taskResults := []v1.TaskRunResult{} + for _, r := range stepRunRes { + // this result was requested by the Task + if _, ok := neededStepResults[r.Name]; ok { + taskRunResult := v1.TaskRunResult{ + Name: neededStepResults[r.Name], + Type: r.Type, + Value: r.Value, + } + taskResults = append(taskResults, taskRunResult) + } + } + return taskResults +} + +func getTaskResultsFromSidecarLogs(sidecarLogResults []result.RunResult) []result.RunResult { + taskResultsFromSidecarLogs := []result.RunResult{} + for _, slr := range sidecarLogResults { + if slr.ResultType == result.TaskRunResultType { + taskResultsFromSidecarLogs = append(taskResultsFromSidecarLogs, slr) + } + } + return taskResultsFromSidecarLogs +} + +func getStepResultsFromSidecarLogs(sidecarLogResults []result.RunResult, containerName string) []result.RunResult { + stepResultsFromSidecarLogs := []result.RunResult{} + for _, slr := range sidecarLogResults { + if slr.ResultType == result.StepResultType { + stepName, resultName := sidecarlogresults.ExtractStepAndResultFromSidecarResultName(slr.Key) + if stepName == containerName { + slr.Key = resultName + stepResultsFromSidecarLogs = append(stepResultsFromSidecarLogs, slr) + } + } + } + return stepResultsFromSidecarLogs +} + func setTaskRunStatusBasedOnStepStatus(ctx context.Context, logger *zap.SugaredLogger, stepStatuses []corev1.ContainerStatus, tr *v1.TaskRun, podPhase corev1.PodPhase, kubeclient kubernetes.Interface, ts *v1.TaskSpec) *multierror.Error { trs := &tr.Status var merr *multierror.Error @@ -178,24 +218,53 @@ func setTaskRunStatusBasedOnStepStatus(ctx context.Context, logger *zap.SugaredL // Extract results from sidecar logs sidecarLogsResultsEnabled := config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs + sidecarLogResults := []result.RunResult{} if sidecarLogsResultsEnabled && tr.Status.TaskSpec.Results != nil { // extraction of results from sidecar logs - sidecarLogResults, err := sidecarlogresults.GetResultsFromSidecarLogs(ctx, kubeclient, tr.Namespace, tr.Status.PodName, pipeline.ReservedResultsSidecarContainerName, podPhase) + slr, err := sidecarlogresults.GetResultsFromSidecarLogs(ctx, kubeclient, tr.Namespace, tr.Status.PodName, pipeline.ReservedResultsSidecarContainerName, podPhase) if err != nil { merr = multierror.Append(merr, err) } - // populate task run CRD with results from sidecar logs - // since sidecar logs does not support step results yet, it is empty for now. - taskResults, _, _ := filterResults(sidecarLogResults, specResults, []v1.StepResult{}) - if tr.IsDone() { - trs.Results = append(trs.Results, taskResults...) - } + sidecarLogResults = append(sidecarLogResults, slr...) + } + // Populate Task results from sidecar logs + taskResultsFromSidecarLogs := getTaskResultsFromSidecarLogs(sidecarLogResults) + taskResults, _, _ := filterResults(taskResultsFromSidecarLogs, specResults, nil) + if tr.IsDone() { + trs.Results = append(trs.Results, taskResults...) } + // Continue with extraction of termination messages for _, s := range stepStatuses { // Avoid changing the original value by modifying the pointer value. state := s.State.DeepCopy() taskRunStepResults := []v1.TaskRunStepResult{} + + // Identify Step Results + stepResults := []v1.StepResult{} + if ts != nil { + for _, step := range ts.Steps { + if getContainerName(step.Name) == s.Name { + stepResults = append(stepResults, step.Results...) + } + } + } + // Identify StepResults needed by the Task Results + neededStepResults, err := findStepResultsFetchedByTask(s.Name, specResults) + if err != nil { + merr = multierror.Append(merr, err) + } + + // populate step results from sidecar logs + stepResultsFromSidecarLogs := getStepResultsFromSidecarLogs(sidecarLogResults, s.Name) + _, stepRunRes, _ := filterResults(stepResultsFromSidecarLogs, specResults, stepResults) + if tr.IsDone() { + taskRunStepResults = append(taskRunStepResults, stepRunRes...) + // Set TaskResults from StepResults + trs.Results = append(trs.Results, createTaskResultsFromStepResults(stepRunRes, neededStepResults)...) + } + + // Parse termination messages if state.Terminated != nil && len(state.Terminated.Message) != 0 { msg := state.Terminated.Message @@ -215,35 +284,11 @@ func setTaskRunStatusBasedOnStepStatus(ctx context.Context, logger *zap.SugaredL merr = multierror.Append(merr, err) } - // Identify Step Results - stepResults := []v1.StepResult{} - if ts != nil { - for _, step := range ts.Steps { - if getContainerName(step.Name) == s.Name { - stepResults = append(stepResults, step.Results...) - } - } - } taskResults, stepRunRes, filteredResults := filterResults(results, specResults, stepResults) if tr.IsDone() { taskRunStepResults = append(taskRunStepResults, stepRunRes...) - // Identify StepResults needed by the Task Results - neededStepResults, err := findStepResultsFetchedByTask(s.Name, specResults) - if err != nil { - merr = multierror.Append(merr, err) - } // Set TaskResults from StepResults - for _, r := range stepRunRes { - // this result was requested by the Task - if _, ok := neededStepResults[r.Name]; ok { - taskRunResult := v1.TaskRunResult{ - Name: neededStepResults[r.Name], - Type: r.Type, - Value: r.Value, - } - taskResults = append(taskResults, taskRunResult) - } - } + taskResults = append(taskResults, createTaskResultsFromStepResults(stepRunRes, neededStepResults)...) trs.Results = append(trs.Results, taskResults...) } msg, err = createMessageFromResults(filteredResults)