Skip to content

Commit

Permalink
TEP-0142: Surface step results via sidecar logs
Browse files Browse the repository at this point in the history
Prior to this, we enabled surfacing step results via termination message. This PR does the same thing via sidecar logs.
  • Loading branch information
chitrangpatel committed Nov 23, 2023
1 parent b395663 commit ed4d578
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 50 deletions.
9 changes: 8 additions & 1 deletion cmd/sidecarlogresults/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"encoding/json"
"flag"
"log"
"os"
Expand All @@ -30,14 +31,20 @@ import (
func main() {
var resultsDir string
var resultNames string
var stepResultsStr string
flag.StringVar(&resultsDir, "results-dir", pipeline.DefaultResultPath, "Path to the results directory. Default is /tekton/results")
flag.StringVar(&resultNames, "result-names", "", "comma separated result names to expect from the steps running in the pod. eg. foo,bar,baz")
flag.StringVar(&stepResultsStr, "step-results", "", "json containing a map of step Name as key and list of result Names. eg. {\"stepName\":[\"foo\",\"bar\",\"baz\"]}")
flag.Parse()
if resultNames == "" {
log.Fatal("result-names were not provided")
}
expectedResults := strings.Split(resultNames, ",")
err := sidecarlogresults.LookForResults(os.Stdout, pod.RunDir, resultsDir, expectedResults)
expectedStepResults := map[string][]string{}
if err := json.Unmarshal([]byte(stepResultsStr), &expectedStepResults); err != nil {
log.Fatal(err)
}
err := sidecarlogresults.LookForResults(os.Stdout, pod.RunDir, resultsDir, expectedResults, expectedStepResults)
if err != nil {
log.Fatal(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ spec:
image: alpine
results:
- name: result1
- name: result2
script: |
echo "I am a Step Action!!!" >> $(step.results.result1.path)
echo "I am a hidden step action!!!" >> $(step.results.result2.path)
---
apiVersion: tekton.dev/v1
kind: TaskRun
Expand Down
86 changes: 76 additions & 10 deletions internal/sidecarlogresults/sidecarlogresults.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"io"
"os"
"path/filepath"
"strings"

"github.com/tektoncd/pipeline/pkg/apis/config"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/result"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
Expand All @@ -36,10 +38,19 @@ import (
// ErrSizeExceeded indicates that the result exceeded its maximum allowed size
var ErrSizeExceeded = errors.New("results size exceeds configured limit")

type SidecarLogResultType string

const (
taskResultType SidecarLogResultType = "task"
stepResultType SidecarLogResultType = "step"
sidecarResultNameSeparator string = "."
)

// SidecarLogResult holds fields for storing extracted results
type SidecarLogResult struct {
Name string
Value string
Type SidecarLogResultType
}

func fileExists(filename string) (bool, error) {
Expand Down Expand Up @@ -89,10 +100,37 @@ func waitForStepsToFinish(runDir string) error {
return nil
}

func createSidecarResultName(stepName, resultName string) string {
return fmt.Sprintf("%s%s%s", stepName, sidecarResultNameSeparator, resultName)
}

func ExtractStepAndResultFromSidecarResultName(sidecarResultName string) (string, string) {
splitString := strings.SplitN(sidecarResultName, sidecarResultNameSeparator, 2)
return splitString[0], splitString[1]
}

func readResults(resultsDir, resultFile, stepName string, resultType SidecarLogResultType) (SidecarLogResult, error) {
value, err := os.ReadFile(filepath.Join(resultsDir, resultFile))
if os.IsNotExist(err) {
return SidecarLogResult{}, nil
} else if err != nil {
return SidecarLogResult{}, fmt.Errorf("error reading the results file %w", err)
}
resultName := resultFile
if resultType == stepResultType {
resultName = createSidecarResultName(stepName, resultFile)
}
return SidecarLogResult{
Name: resultName,
Value: string(value),
Type: resultType,
}, nil
}

// LookForResults waits for results to be written out by the steps
// in their results path and prints them in a structured way to its
// stdout so that the reconciler can parse those logs.
func LookForResults(w io.Writer, runDir string, resultsDir string, resultNames []string) error {
func LookForResults(w io.Writer, runDir string, resultsDir string, resultNames []string, stepResults map[string][]string) error {
if err := waitForStepsToFinish(runDir); err != nil {
return fmt.Errorf("error while waiting for the steps to finish %w", err)
}
Expand All @@ -102,20 +140,39 @@ func LookForResults(w io.Writer, runDir string, resultsDir string, resultNames [
resultFile := resultFile

g.Go(func() error {
value, err := os.ReadFile(filepath.Join(resultsDir, resultFile))
if os.IsNotExist(err) {
return nil
} else if err != nil {
return fmt.Errorf("error reading the results file %w", err)
newResult, err := readResults(resultsDir, resultFile, "", taskResultType)
if err != nil {
return err
}
newResult := SidecarLogResult{
Name: resultFile,
Value: string(value),
if newResult.Name == "" {
return nil
}
results <- newResult
return nil
})
}

for sName, sresults := range stepResults {
sresults := sresults
sName := sName
for _, resultName := range sresults {
resultName := resultName
stepResultsDir := filepath.Join(pipeline.StepsDir, sName, "results")

g.Go(func() error {
newResult, err := readResults(stepResultsDir, resultName, sName, stepResultType)
if err != nil {
return err
}
if newResult.Name == "" {
return nil
}
results <- newResult
return nil
})
}
}

channelGroup := new(errgroup.Group)
channelGroup.Go(func() error {
if err := g.Wait(); err != nil {
Expand Down Expand Up @@ -183,10 +240,19 @@ func parseResults(resultBytes []byte, maxResultLimit int) (result.RunResult, err
if len(resultBytes) > maxResultLimit {
return runResult, fmt.Errorf("invalid result \"%s\": %w of %d", res.Name, ErrSizeExceeded, maxResultLimit)
}
var resultType result.ResultType
switch res.Type {
case taskResultType:
resultType = result.TaskRunResultType
case stepResultType:
resultType = result.StepResultType
default:
return result.RunResult{}, fmt.Errorf("Invalid sidecar result type %v. Must be %v or %v", res.Type, taskResultType, stepResultType)
}
runResult = result.RunResult{
Key: res.Name,
Value: res.Value,
ResultType: result.TaskRunResultType,
ResultType: resultType,
}
return runResult, nil
}
38 changes: 31 additions & 7 deletions pkg/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package pod

import (
"context"
"encoding/json"
"fmt"
"log"
"math"
Expand Down Expand Up @@ -190,7 +191,10 @@ func (b *Builder) Build(ctx context.Context, taskRun *v1.TaskRun, taskSpec v1.Ta
windows := usesWindows(taskRun)
if sidecarLogsResultsEnabled && taskSpec.Results != nil {
// create a results sidecar
resultsSidecar := createResultsSidecar(taskSpec, b.Images.SidecarLogResultsImage, setSecurityContext, windows)
resultsSidecar, err := createResultsSidecar(taskSpec, b.Images.SidecarLogResultsImage, setSecurityContext, windows)
if err != nil {
return nil, err
}
taskSpec.Sidecars = append(taskSpec.Sidecars, resultsSidecar)
commonExtraEntrypointArgs = append(commonExtraEntrypointArgs, "-result_from", config.ResultExtractionMethodSidecarLogs)
}
Expand Down Expand Up @@ -568,26 +572,46 @@ func entrypointInitContainer(image string, steps []v1.Step, setSecurityContext,
// based on the spec of the Task, the image that should run in the results sidecar,
// whether it will run on a windows node, and whether the sidecar should include a security context
// that will allow it to run in namespaces with "restricted" pod security admission.
func createResultsSidecar(taskSpec v1.TaskSpec, image string, setSecurityContext, windows bool) v1.Sidecar {
func createResultsSidecar(taskSpec v1.TaskSpec, image string, setSecurityContext, windows bool) (v1.Sidecar, error) {
names := make([]string, 0, len(taskSpec.Results))
for _, r := range taskSpec.Results {
names = append(names, r.Name)
}
securityContext := linuxSecurityContext
if windows {
securityContext = windowsSecurityContext
}
resultsStr := strings.Join(names, ",")
command := []string{"/ko-app/sidecarlogresults", "-results-dir", pipeline.DefaultResultPath, "-result-names", resultsStr}

// create a map of container Name to step results
stepResults := map[string][]string{}
for i, s := range taskSpec.Steps {
if len(s.Results) > 0 {
stepName := StepName(s.Name, i)
stepResults[stepName] = make([]string, 0, len(s.Results))
for _, r := range s.Results {
stepResults[stepName] = append(stepResults[stepName], r.Name)
}
}
}

stepResultsBytes, err := json.Marshal(stepResults)
if err != nil {
return v1.Sidecar{}, err
}
if len(stepResultsBytes) > 0 {
command = append(command, "-step-results", string(stepResultsBytes))
}
sidecar := v1.Sidecar{
Name: pipeline.ReservedResultsSidecarName,
Image: image,
Command: command,
}
securityContext := linuxSecurityContext
if windows {
securityContext = windowsSecurityContext
}
if setSecurityContext {
sidecar.SecurityContext = securityContext
}
return sidecar
return sidecar, nil
}

// usesWindows returns true if the TaskRun will run on a windows node,
Expand Down
109 changes: 77 additions & 32 deletions pkg/pod/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,46 @@ func MakeTaskRunStatus(ctx context.Context, logger *zap.SugaredLogger, tr v1.Tas
return *trs, merr.ErrorOrNil()
}

func createTaskResultsFromStepResults(stepRunRes []v1.TaskRunStepResult, neededStepResults map[string]string) []v1.TaskRunResult {
taskResults := []v1.TaskRunResult{}
for _, r := range stepRunRes {
// this result was requested by the Task
if _, ok := neededStepResults[r.Name]; ok {
taskRunResult := v1.TaskRunResult{
Name: neededStepResults[r.Name],
Type: r.Type,
Value: r.Value,
}
taskResults = append(taskResults, taskRunResult)
}
}
return taskResults
}

func getTaskResultsFromSidecarLogs(sidecarLogResults []result.RunResult) []result.RunResult {
taskResultsFromSidecarLogs := []result.RunResult{}
for _, slr := range sidecarLogResults {
if slr.ResultType == result.TaskRunResultType {
taskResultsFromSidecarLogs = append(taskResultsFromSidecarLogs, slr)
}
}
return taskResultsFromSidecarLogs
}

func getStepResultsFromSidecarLogs(sidecarLogResults []result.RunResult, containerName string) []result.RunResult {
stepResultsFromSidecarLogs := []result.RunResult{}
for _, slr := range sidecarLogResults {
if slr.ResultType == result.StepResultType {
stepName, resultName := sidecarlogresults.ExtractStepAndResultFromSidecarResultName(slr.Key)
if stepName == containerName {
slr.Key = resultName
stepResultsFromSidecarLogs = append(stepResultsFromSidecarLogs, slr)
}
}
}
return stepResultsFromSidecarLogs
}

func setTaskRunStatusBasedOnStepStatus(ctx context.Context, logger *zap.SugaredLogger, stepStatuses []corev1.ContainerStatus, tr *v1.TaskRun, podPhase corev1.PodPhase, kubeclient kubernetes.Interface, ts *v1.TaskSpec) *multierror.Error {
trs := &tr.Status
var merr *multierror.Error
Expand All @@ -178,24 +218,53 @@ func setTaskRunStatusBasedOnStepStatus(ctx context.Context, logger *zap.SugaredL

// Extract results from sidecar logs
sidecarLogsResultsEnabled := config.FromContextOrDefaults(ctx).FeatureFlags.ResultExtractionMethod == config.ResultExtractionMethodSidecarLogs
sidecarLogResults := []result.RunResult{}
if sidecarLogsResultsEnabled && tr.Status.TaskSpec.Results != nil {
// extraction of results from sidecar logs
sidecarLogResults, err := sidecarlogresults.GetResultsFromSidecarLogs(ctx, kubeclient, tr.Namespace, tr.Status.PodName, pipeline.ReservedResultsSidecarContainerName, podPhase)
slr, err := sidecarlogresults.GetResultsFromSidecarLogs(ctx, kubeclient, tr.Namespace, tr.Status.PodName, pipeline.ReservedResultsSidecarContainerName, podPhase)
if err != nil {
merr = multierror.Append(merr, err)
}
// populate task run CRD with results from sidecar logs
// since sidecar logs does not support step results yet, it is empty for now.
taskResults, _, _ := filterResults(sidecarLogResults, specResults, []v1.StepResult{})
if tr.IsDone() {
trs.Results = append(trs.Results, taskResults...)
}
sidecarLogResults = append(sidecarLogResults, slr...)
}
// Populate Task results from sidecar logs
taskResultsFromSidecarLogs := getTaskResultsFromSidecarLogs(sidecarLogResults)
taskResults, _, _ := filterResults(taskResultsFromSidecarLogs, specResults, nil)
if tr.IsDone() {
trs.Results = append(trs.Results, taskResults...)
}

// Continue with extraction of termination messages
for _, s := range stepStatuses {
// Avoid changing the original value by modifying the pointer value.
state := s.State.DeepCopy()
taskRunStepResults := []v1.TaskRunStepResult{}

// Identify Step Results
stepResults := []v1.StepResult{}
if ts != nil {
for _, step := range ts.Steps {
if getContainerName(step.Name) == s.Name {
stepResults = append(stepResults, step.Results...)
}
}
}
// Identify StepResults needed by the Task Results
neededStepResults, err := findStepResultsFetchedByTask(s.Name, specResults)
if err != nil {
merr = multierror.Append(merr, err)
}

// populate step results from sidecar logs
stepResultsFromSidecarLogs := getStepResultsFromSidecarLogs(sidecarLogResults, s.Name)
_, stepRunRes, _ := filterResults(stepResultsFromSidecarLogs, specResults, stepResults)
if tr.IsDone() {
taskRunStepResults = append(taskRunStepResults, stepRunRes...)
// Set TaskResults from StepResults
trs.Results = append(trs.Results, createTaskResultsFromStepResults(stepRunRes, neededStepResults)...)
}

// Parse termination messages
if state.Terminated != nil && len(state.Terminated.Message) != 0 {
msg := state.Terminated.Message

Expand All @@ -215,35 +284,11 @@ func setTaskRunStatusBasedOnStepStatus(ctx context.Context, logger *zap.SugaredL
merr = multierror.Append(merr, err)
}

// Identify Step Results
stepResults := []v1.StepResult{}
if ts != nil {
for _, step := range ts.Steps {
if getContainerName(step.Name) == s.Name {
stepResults = append(stepResults, step.Results...)
}
}
}
taskResults, stepRunRes, filteredResults := filterResults(results, specResults, stepResults)
if tr.IsDone() {
taskRunStepResults = append(taskRunStepResults, stepRunRes...)
// Identify StepResults needed by the Task Results
neededStepResults, err := findStepResultsFetchedByTask(s.Name, specResults)
if err != nil {
merr = multierror.Append(merr, err)
}
// Set TaskResults from StepResults
for _, r := range stepRunRes {
// this result was requested by the Task
if _, ok := neededStepResults[r.Name]; ok {
taskRunResult := v1.TaskRunResult{
Name: neededStepResults[r.Name],
Type: r.Type,
Value: r.Value,
}
taskResults = append(taskResults, taskRunResult)
}
}
taskResults = append(taskResults, createTaskResultsFromStepResults(stepRunRes, neededStepResults)...)
trs.Results = append(trs.Results, taskResults...)
}
msg, err = createMessageFromResults(filteredResults)
Expand Down

0 comments on commit ed4d578

Please sign in to comment.