Skip to content

Commit

Permalink
employ some channel best practices to avoid hanging goroutines and me…
Browse files Browse the repository at this point in the history
…mory leaks from unclosed buffered channels

fix panic in e2es and add dump of controller logs on errors

rh-pre-commit.version: 2.2.0
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
gabemontero authored and tekton-robot committed Feb 27, 2024
1 parent 2592566 commit 21d96ba
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 34 deletions.
85 changes: 62 additions & 23 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package dynamic

import (
"bytes"
"context"
"fmt"
"time"
Expand Down Expand Up @@ -355,29 +356,31 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error {

func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, logName string) error {
logger := logging.FromContext(ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logsClient, err := r.resultsClient.UpdateLog(ctx)
if err != nil {
return fmt.Errorf("failed to create UpdateLog client: %w", err)
}

writer := logs.NewBufferedWriter(logsClient, logName, logs.DefaultBufferSize)

inMemWriteBufferStdout := bytes.NewBuffer(make([]byte, 0))
inMemWriteBufferStderr := bytes.NewBuffer(make([]byte, 0))
tknParams := &cli.TektonParams{}
tknParams.SetNamespace(o.GetNamespace())
// KLUGE: tkn reader.Read() will raise an error if a step in the TaskRun failed and there is no
// Err writer in the Stream object. This will result in some "error" messages being written to
// the log.
// the log. That, coupled with the fact that the tkn client wrappers and oftent masks errors
// makes it impossible to differentiate between retryable and permanent k8s errors wrt retrying
// reconciliation in this controller

reader, err := tknlog.NewReader(logType, &tknopts.LogOptions{
AllSteps: true,
Params: tknParams,
PipelineRunName: o.GetName(),
TaskrunName: o.GetName(),
Stream: &cli.Stream{
Out: writer,
Err: writer,
Out: inMemWriteBufferStdout,
Err: inMemWriteBufferStderr,
},
})
if err != nil {
Expand All @@ -388,27 +391,63 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,
return fmt.Errorf("error reading from tkn reader: %w", err)
}

errChanRepeater := make(chan error)
go func(echan <-chan error, o metav1.Object) {
writeErr := <-echan
errChanRepeater <- writeErr
tknlog.NewWriter(logType, true).Write(&cli.Stream{
Out: inMemWriteBufferStdout,
Err: inMemWriteBufferStderr,
}, logChan, errChan)

bufStdout := inMemWriteBufferStdout.Bytes()
cntStdout, writeStdOutErr := writer.Write(bufStdout)
if writeStdOutErr != nil {
logger.Warnw("streamLogs in mem bufStdout write err",
zap.String("error", writeStdOutErr.Error()),
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)
}
if cntStdout != len(bufStdout) {
logger.Warnw("streamLogs bufStdout write len inconsistent",
zap.Int("in", len(bufStdout)),
zap.Int("out", cntStdout),
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)

_, err := writer.Flush()
if err != nil {
logger.Error(err)
}
if err = logsClient.CloseSend(); err != nil {
logger.Error(err)
}
}(errChan, o)
}
bufStderr := inMemWriteBufferStderr.Bytes()
// we do not write these errors to the results api server

// TODO we may need somehow discern the precise nature of the errors here and adjust how
// we return accordingly
if len(bufStderr) > 0 {
errStr := string(bufStderr)
logger.Warnw("tkn client std error output",
zap.String("name", o.GetName()),
zap.String("errStr", errStr))
}

// errChanRepeater receives stderr from the TaskRun containers.
// This will be forwarded as combined output (stdout and stderr)
flushCount, flushErr := writer.Flush()
logger.Warnw("flush ret count",
zap.String("name", o.GetName()),
zap.Int("flushCount", flushCount))
if flushErr != nil {
logger.Warnw("flush ret err",
zap.String("error", flushErr.Error()))
logger.Error(flushErr)
return flushErr
}
if closeErr := logsClient.CloseSend(); closeErr != nil {
logger.Warnw("CloseSend ret err",
zap.String("name", o.GetName()),
zap.String("error", closeErr.Error()))
logger.Error(closeErr)
return closeErr
}

tknlog.NewWriter(logType, true).Write(&cli.Stream{
Out: writer,
Err: writer,
}, logChan, errChanRepeater)
logger.Debugw("Exiting streamLogs",
zap.String("namespace", o.GetNamespace()),
zap.String("name", o.GetName()),
)

return nil
}
93 changes: 82 additions & 11 deletions test/e2e/e2e_gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
package e2e

import (
"bytes"
"context"
"io"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"testing"

resultsv1alpha2 "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"google.golang.org/genproto/googleapis/api/httpbody"

"strings"
"time"
Expand Down Expand Up @@ -75,6 +80,10 @@ func TestGCSLog(t *testing.T) {
}
return false, nil
}); err != nil {
t.Log("dumping watcher logs")
podLogs(t, "tekton-pipelines", "watcher")
t.Log("dumping api logs")
podLogs(t, "tekton-pipelines", "api")
t.Fatalf("Error waiting for PipelineRun creation: %v", err)
}
})
Expand All @@ -93,17 +102,79 @@ func TestGCSLog(t *testing.T) {
if logName == "" {
t.Skip("log name not found")
}
logClient, err := gc.GetLog(context.Background(), &resultsv1alpha2.GetLogRequest{Name: logName})
if err != nil {
t.Errorf("Error getting Log Client: %v", err)
}
log, err := logClient.Recv()
if err != nil {
t.Errorf("Error getting Log: %v", err)
}
want := "[hello : hello] hello world!"
if !strings.Contains(string(log.Data), want) {
t.Errorf("Log Data inconsistent got: %s, doesn't have: %s", string(log.Data), want)
if err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (done bool, err error) {
logClient, err := gc.GetLog(context.Background(), &resultsv1alpha2.GetLogRequest{Name: logName})
if err != nil {
t.Logf("Error getting Log Client: %v", err)
return false, nil
}
var log *httpbody.HttpBody
var cerr error
log, cerr = logClient.Recv()
if cerr != nil {
t.Logf("Error getting Log for %s: %v", logName, cerr)
return false, nil
}
want := "[hello : hello] hello world!"
if log == nil {
t.Logf("Nil return from logClient.Recv()")
return false, nil
}
if !strings.Contains(string(log.Data), want) {
t.Logf("Log Data inconsistent for %s got: %s, doesn't have: %s", logName, string(log.Data), want)
return false, nil
}
return true, nil

}); err != nil {
t.Log("dumping watcher logs")
podLogs(t, "tekton-pipelines", "watcher")
t.Log("dumping api logs")
podLogs(t, "tekton-pipelines", "api")
t.Fatalf("Error waiting for check log: %v", err)
}
})
}

func podLogs(t *testing.T, ns string, name string) {
t.Logf("getting pod logs for the pattern %s", name)
clientset := kubernetes.NewForConfigOrDie(clientConfig(t))
ctx := context.Background()
list, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
if err != nil {
t.Errorf("pod list error %s", err)
}
for _, pod := range list.Items {
if strings.Contains(pod.Name, name) {
t.Logf("found pod %s matcher pattern %s", pod.Name, name)
for _, c := range pod.Spec.Containers {
containerLogs(t, ctx, ns, pod.Name, c.Name)
}
break
}
}
}

func containerLogs(t *testing.T, ctx context.Context, ns, podName, containerName string) {
podLogOpts := corev1.PodLogOptions{}
podLogOpts.Container = containerName
t.Logf("print container %s from pod %s:", containerName, podName)
clientset := kubernetes.NewForConfigOrDie(clientConfig(t))
req := clientset.CoreV1().Pods(ns).GetLogs(podName, &podLogOpts)
logs, err := req.Stream(ctx)
if err != nil {
t.Errorf("error streaming pod logs %s", err.Error())
return
}
defer logs.Close()

buf := new(bytes.Buffer)
_, err = io.Copy(buf, logs)
if err != nil {
t.Errorf("error copying pod logs %s", err.Error())
return
}
str := buf.String()
t.Logf("%s", str)

}

0 comments on commit 21d96ba

Please sign in to comment.