Skip to content

Commit

Permalink
TEP-0142: Surface step results via termination message
Browse files Browse the repository at this point in the history
This PR surfaces step results (i.e results written to $(step.results.<resultName>.path)) via termination messages.  A followup PR will handle surfacing the results via sidecar logs.
  • Loading branch information
chitrangpatel committed Nov 13, 2023
1 parent 54bda50 commit 751dda7
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 28 deletions.
9 changes: 9 additions & 0 deletions cmd/entrypoint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var (
enableSpire = flag.Bool("enable_spire", false, "If specified by configmap, this enables spire signing and verification")
socketPath = flag.String("spire_socket_path", "unix:///spiffe-workload-api/spire-agent.sock", "Experimental: The SPIRE agent socket for SPIFFE workload API.")
resultExtractionMethod = flag.String("result_from", featureFlags.ResultExtractionMethodTerminationMessage, "The method using which to extract results from tasks. Default is using the termination message.")
stepName = flag.String("step_name", "", "Name of the step")
stepResults = flag.String("step_results", "", "step results if specified")
)

const (
Expand Down Expand Up @@ -145,6 +147,11 @@ func main() {
}
spireWorkloadAPI = spire.NewEntrypointerAPIClient(&spireConfig)
}
stepRes := map[string]string{}
err := json.Unmarshal([]byte(*stepResults), &stepRes)
if err != nil {
log.Fatal(err)
}

e := entrypoint.Entrypointer{
Command: append(cmd, commandArgs...),
Expand All @@ -159,11 +166,13 @@ func main() {
},
PostWriter: &realPostWriter{},
Results: strings.Split(*results, ","),
StepResults: stepRes,
Timeout: timeout,
BreakpointOnFailure: *breakpointOnFailure,
OnError: *onError,
StepMetadataDir: *stepMetadataDir,
SpireWorkloadAPI: spireWorkloadAPI,
StepName: *stepName,
ResultExtractionMethod: *resultExtractionMethod,
}

Expand Down
56 changes: 56 additions & 0 deletions examples/v1/taskruns/alpha/stepaction-results.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
apiVersion: tekton.dev/v1alpha1
kind: StepAction
metadata:
name: step-action-uri
spec:
results:
- name: uri
params:
- name: uri
image: alpine
script: |
echo $(params.uri) > $(step.results.uri.path)
---
apiVersion: tekton.dev/v1alpha1
kind: StepAction
metadata:
name: step-action-uri-digest
spec:
results:
- name: digest
- name: uri
params:
- name: uri
- name: digest
image: alpine
script: |
echo $(params.digest) > $(step.results.digest.path)
echo $(params.uri) > $(step.results.uri.path)
---
apiVersion: tekton.dev/v1
kind: TaskRun
metadata:
name: step-action-run
spec:
TaskSpec:
results:
- name: step-1-uri
value: $(steps.step1.results.uri)
- name: step-2-uri
value: $(steps.step2.results.uri)
- name: digest
steps:
- name: step1
ref:
name: step-action-uri
params:
- name: uri
value: "https://github.com/tektoncd/pipeline"
name: step2
- ref:
name: step-action-uri-digest
params:
- name: uri
value: "https://github.com/tektoncd/other"
- name: digest
value: "c8381846241cac4c93c30b6a5ac04cac51fa0a6e"
27 changes: 25 additions & 2 deletions pkg/entrypoint/entrypointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type Entrypointer struct {
// PostWriter encapsulates writing files when complete.
PostWriter PostWriter

// StepResults is the set of files that might contain step results
StepResults map[string]string
// Results is the set of files that might contain task results
Results []string
// Timeout is an optional user-specified duration within which the Step must complete
Expand All @@ -110,6 +112,8 @@ type Entrypointer struct {
StepMetadataDir string
// SpireWorkloadAPI connects to spire and does obtains SVID based on taskrun
SpireWorkloadAPI spire.EntrypointerAPIClient
// StepName is the name of the step
StepName string
// ResultsDirectory is the directory to find results, defaults to pipeline.DefaultResultPath
ResultsDirectory string
// ResultExtractionMethod is the method using which the controller extracts the results from the task pod.
Expand All @@ -136,6 +140,7 @@ type PostWriter interface {
// Go optionally waits for a file, runs the command, and writes a
// post file.
func (e Entrypointer) Go() error {
var err error
prod, _ := zap.NewProduction()
logger := prod.Sugar()

Expand All @@ -147,6 +152,11 @@ func (e Entrypointer) Go() error {
_ = logger.Sync()
}()

if e.StepName != "" && e.StepResults != nil {
if err := os.MkdirAll(filepath.Join(pipeline.StepsDir, e.StepName, "results"), os.ModePerm); err != nil {
return err
}
}
for _, f := range e.WaitFiles {
if err := e.Waiter.Wait(context.Background(), f, e.WaitFileContent, e.BreakpointOnFailure); err != nil {
// An error happened while waiting, so we bail
Expand All @@ -170,7 +180,6 @@ func (e Entrypointer) Go() error {
ResultType: result.InternalTektonResultType,
})

var err error
if e.Timeout != nil && *e.Timeout < time.Duration(0) {
err = fmt.Errorf("negative timeout specified")
}
Expand Down Expand Up @@ -232,7 +241,7 @@ func (e Entrypointer) Go() error {

// strings.Split(..) with an empty string returns an array that contains one element, an empty string.
// This creates an error when trying to open the result folder as a file.
if len(e.Results) >= 1 && e.Results[0] != "" {
if (len(e.Results) >= 1 && e.Results[0] != "") || len(e.StepResults) > 0 {
resultPath := pipeline.DefaultResultPath
if e.ResultsDirectory != "" {
resultPath = e.ResultsDirectory
Expand Down Expand Up @@ -264,6 +273,20 @@ func (e Entrypointer) readResultsFromDisk(ctx context.Context, resultDir string)
ResultType: result.TaskRunResultType,
})
}
for resultName, resultPath := range e.StepResults {
fileContents, err := os.ReadFile(resultPath)
if os.IsNotExist(err) {
continue
} else if err != nil {
return err
}
// if the file doesn't exist, ignore it
output = append(output, result.RunResult{
Key: resultName,
Value: string(fileContents),
ResultType: result.TaskRunResultType,
})
}
if e.SpireWorkloadAPI != nil {
signed, err := e.SpireWorkloadAPI.Sign(ctx, output)
if err != nil {
Expand Down
58 changes: 53 additions & 5 deletions pkg/pod/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ var (
// command, we must have fetched the image's ENTRYPOINT before calling this
// method, using entrypoint_lookup.go.
// Additionally, Step timeouts are added as entrypoint flag.
func orderContainers(commonExtraEntrypointArgs []string, steps []corev1.Container, taskSpec *v1.TaskSpec, breakpointConfig *v1.TaskRunDebug, waitForReadyAnnotation, enableKeepPodOnCancel bool) ([]corev1.Container, error) {
func orderContainers(ctx context.Context, commonExtraEntrypointArgs []string, steps []corev1.Container, taskSpec *v1.TaskSpec, breakpointConfig *v1.TaskRunDebug, waitForReadyAnnotation, enableKeepPodOnCancel bool) ([]corev1.Container, error) {
if len(steps) == 0 {
return nil, errors.New("No steps specified")
}
Expand All @@ -143,12 +143,17 @@ func orderContainers(commonExtraEntrypointArgs []string, steps []corev1.Containe
} else { // Not the first step - wait for previous
argsForEntrypoint = append(argsForEntrypoint, "-wait_file", filepath.Join(RunDir, strconv.Itoa(i-1), "out"))
}
stepName := StepName(s.Name, i, false)
argsForEntrypoint = append(argsForEntrypoint,
// Start next step.
"-post_file", filepath.Join(RunDir, idx, "out"),
"-termination_path", terminationPath,
"-step_metadata_dir", filepath.Join(RunDir, idx, "status"),
)
if config.FromContextOrDefaults(ctx).FeatureFlags.EnableStepActions {
argsForEntrypoint = append(argsForEntrypoint, "-step_name", stepName)
}

argsForEntrypoint = append(argsForEntrypoint, commonExtraEntrypointArgs...)
if taskSpec != nil {
if taskSpec.Steps != nil && len(taskSpec.Steps) >= i+1 {
Expand All @@ -170,6 +175,13 @@ func orderContainers(commonExtraEntrypointArgs []string, steps []corev1.Containe
}
}
argsForEntrypoint = append(argsForEntrypoint, resultArgument(steps, taskSpec.Results)...)
if config.FromContextOrDefaults(ctx).FeatureFlags.EnableStepActions {
stepResultArgs, err := stepResultArgument(taskSpec.Results, stepName)
if err != nil {
return nil, err
}
argsForEntrypoint = append(argsForEntrypoint, stepResultArgs...)
}
}

if breakpointConfig != nil && breakpointConfig.NeedsDebugOnFailure() {
Expand Down Expand Up @@ -199,6 +211,36 @@ func orderContainers(commonExtraEntrypointArgs []string, steps []corev1.Containe
return steps, nil
}

func stepResultArgument(results []v1.TaskResult, stepName string) ([]string, error) {
if len(results) == 0 {
return nil, nil
}
if stepName == "" {
return nil, nil
}
res := map[string]string{}
for _, r := range results {
if r.Value != nil {
if r.Value.StringVal != "" {
sName, resultName, err := v1.ExtractStepResultName(r.Value.StringVal)
if err != nil {
return nil, err
}
if stepName == sName {
res[r.Name] = filepath.Join(pipeline.StepsDir, stepName, "results", resultName)
}
}
} else {
res[r.Name] = filepath.Join(pipeline.StepsDir, stepName, "results", r.Name)
}
}
resBytes, err := json.Marshal(res)
if err != nil {
return nil, err
}
return []string{"-step_results", string(resBytes)}, nil
}

func resultArgument(steps []corev1.Container, results []v1.TaskResult) []string {
if len(results) == 0 {
return nil
Expand All @@ -209,7 +251,9 @@ func resultArgument(steps []corev1.Container, results []v1.TaskResult) []string
func collectResultsName(results []v1.TaskResult) string {
var resultNames []string
for _, r := range results {
resultNames = append(resultNames, r.Name)
if r.Value == nil {
resultNames = append(resultNames, r.Name)
}
}
return strings.Join(resultNames, ",")
}
Expand Down Expand Up @@ -333,9 +377,13 @@ func TrimSidecarPrefix(name string) string { return strings.TrimPrefix(name, sid

// StepName returns the step name after adding "step-" prefix to the actual step name or
// returns "step-unnamed-<step-index>" if not specified
func StepName(name string, i int) string {
func StepName(name string, i int, addPrefix bool) string {
prefix := ""
if addPrefix {
prefix = stepPrefix
}
if name != "" {
return fmt.Sprintf("%s%s", stepPrefix, name)
return fmt.Sprintf("%s%s", prefix, name)
}
return fmt.Sprintf("%sunnamed-%d", stepPrefix, i)
return fmt.Sprintf("%sunnamed-%d", prefix, i)
}
20 changes: 10 additions & 10 deletions pkg/pod/entrypoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestOrderContainers(t *testing.T) {
},
TerminationMessagePath: "/tekton/termination",
}}
got, err := orderContainers([]string{}, steps, nil, nil, true, false)
got, err := orderContainers(context.Background(), []string{}, steps, nil, nil, true, false)
if err != nil {
t.Fatalf("orderContainers: %v", err)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestOrderContainersWithResultsSidecarLogs(t *testing.T) {
},
TerminationMessagePath: "/tekton/termination",
}}
got, err := orderContainers([]string{"-dont_send_results_to_termination_path"}, steps, nil, nil, true, false)
got, err := orderContainers(context.Background(), []string{"-dont_send_results_to_termination_path"}, steps, nil, nil, true, false)
if err != nil {
t.Fatalf("orderContainers: %v", err)
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func TestOrderContainersWithNoWait(t *testing.T) {
VolumeMounts: []corev1.VolumeMount{volumeMount},
TerminationMessagePath: "/tekton/termination",
}}
got, err := orderContainers([]string{}, steps, nil, nil, false, false)
got, err := orderContainers(context.Background(), []string{}, steps, nil, nil, false, false)
if err != nil {
t.Fatalf("orderContainers: %v", err)
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestOrderContainersWithDebugOnFailure(t *testing.T) {
OnFailure: "enabled",
},
}
got, err := orderContainers([]string{}, steps, nil, taskRunDebugConfig, true, false)
got, err := orderContainers(context.Background(), []string{}, steps, nil, taskRunDebugConfig, true, false)
if err != nil {
t.Fatalf("orderContainers: %v", err)
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestOrderContainersWithEnabelKeepPodOnCancel(t *testing.T) {
VolumeMounts: []corev1.VolumeMount{downwardMount},
TerminationMessagePath: "/tekton/termination",
}}
got, err := orderContainers([]string{}, steps, nil, nil, false, true)
got, err := orderContainers(context.Background(), []string{}, steps, nil, nil, false, true)
if err != nil {
t.Fatalf("orderContainers: %v", err)
}
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestEntryPointResults(t *testing.T) {
},
TerminationMessagePath: "/tekton/termination",
}}
got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false)
got, err := orderContainers(context.Background(), []string{}, steps, &taskSpec, nil, true, false)
if err != nil {
t.Fatalf("orderContainers: %v", err)
}
Expand Down Expand Up @@ -392,7 +392,7 @@ func TestEntryPointResultsSingleStep(t *testing.T) {
VolumeMounts: []corev1.VolumeMount{downwardMount},
TerminationMessagePath: "/tekton/termination",
}}
got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false)
got, err := orderContainers(context.Background(), []string{}, steps, &taskSpec, nil, true, false)
if err != nil {
t.Fatalf("orderContainers: %v", err)
}
Expand Down Expand Up @@ -429,7 +429,7 @@ func TestEntryPointSingleResultsSingleStep(t *testing.T) {
VolumeMounts: []corev1.VolumeMount{downwardMount},
TerminationMessagePath: "/tekton/termination",
}}
got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false)
got, err := orderContainers(context.Background(), []string{}, steps, &taskSpec, nil, true, false)
if err != nil {
t.Fatalf("orderContainers: %v", err)
}
Expand Down Expand Up @@ -500,7 +500,7 @@ func TestEntryPointOnError(t *testing.T) {
err: errors.New("task step onError must be either \"continue\" or \"stopAndFail\" but it is set to an invalid value \"invalid-on-error\""),
}} {
t.Run(tc.desc, func(t *testing.T) {
got, err := orderContainers([]string{}, steps, &tc.taskSpec, nil, true, false)
got, err := orderContainers(context.Background(), []string{}, steps, &tc.taskSpec, nil, true, false)
if len(tc.wantContainers) == 0 {
if err == nil {
t.Fatalf("expected an error for an invalid value for onError but received none")
Expand Down Expand Up @@ -599,7 +599,7 @@ func TestEntryPointStepOutputConfigs(t *testing.T) {
},
TerminationMessagePath: "/tekton/termination",
}}
got, err := orderContainers([]string{}, steps, &taskSpec, nil, true, false)
got, err := orderContainers(context.Background(), []string{}, steps, &taskSpec, nil, true, false)
if err != nil {
t.Fatalf("orderContainers: %v", err)
}
Expand Down
Loading

0 comments on commit 751dda7

Please sign in to comment.