diff --git a/pkg/kube/client.go b/pkg/kube/client.go index c4dcc1a4e9..444bbf7499 100644 --- a/pkg/kube/client.go +++ b/pkg/kube/client.go @@ -21,8 +21,6 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - - _ "k8s.io/client-go/plugin/pkg/client/auth" ) func newClientConfig() clientcmd.ClientConfig { diff --git a/pkg/kube/exec.go b/pkg/kube/exec.go index 7e628d27a2..fed453b297 100644 --- a/pkg/kube/exec.go +++ b/pkg/kube/exec.go @@ -149,33 +149,16 @@ func ExecWithOptions(ctx context.Context, kubeCli kubernetes.Interface, options if err != nil { return err } - - errCh := execStream(ctx, kubeCli, config, options) - err = <-errCh - if err != nil { - return errkit.Wrap(err, "Failed to exec command in pod") - } - - return nil + err = execStreamWithLogTail(ctx, kubeCli, config, options) + return errkit.Wrap(err, "Failed to exec command in pod") } -func execStream( +func execStreamWithLogTail( ctx context.Context, kubeCli kubernetes.Interface, config *restclient.Config, options ExecOptions, -) chan error { - const tty = false - req := kubeCli.CoreV1().RESTClient().Post(). - Resource("pods"). - Name(options.PodName). - Namespace(options.Namespace). - SubResource("exec") - - if len(options.ContainerName) != 0 { - req.Param("container", options.ContainerName) - } - +) error { stderrTail := NewLogTail(logTailDefaultLength) stdoutTail := NewLogTail(logTailDefaultLength) @@ -183,41 +166,66 @@ func execStream( if options.Stdout != nil { stdout = io.MultiWriter(options.Stdout, stdoutTail) } + options.Stdout = stdout var stderr io.Writer = stderrTail if options.Stderr != nil { stderr = io.MultiWriter(options.Stderr, stderrTail) } + options.Stderr = stderr + + u, err := requestURL(kubeCli, options) + if err != nil { + return err + } + + err = execStream(ctx, u, config, options) + if err != nil { + return NewExecError(err, stdoutTail, stderrTail) + } + return nil +} + +func requestURL(kubeCli kubernetes.Interface, options ExecOptions) (*url.URL, error) { + const tty = false + req := kubeCli.CoreV1().RESTClient().Post(). + Resource("pods"). + Name(options.PodName). + Namespace(options.Namespace). + SubResource("exec") + + if len(options.ContainerName) != 0 { + req.Param("container", options.ContainerName) + } req.VersionedParams(&corev1.PodExecOptions{ Container: options.ContainerName, Command: options.Command, Stdin: options.Stdin != nil, - Stdout: stdout != nil, - Stderr: stderr != nil, + Stdout: options.Stdout != nil, + Stderr: options.Stderr != nil, TTY: tty, }, scheme.ParameterCodec) + return req.URL(), req.Error() +} - errCh := make(chan error, 1) - go func() { - err := execute( - ctx, - "POST", - req.URL(), - config, - options.Stdin, - stdout, - stderr, - tty) - - if err != nil { - err = NewExecError(err, stdoutTail, stderrTail) - } - - errCh <- err - }() - - return errCh +func execStream( + ctx context.Context, + reqURL *url.URL, + config *restclient.Config, + options ExecOptions, +) error { + const tty = false + err := execute( + ctx, + "POST", + reqURL, + config, + options.Stdin, + options.Stdout, + options.Stderr, + tty) + return err } func execute( @@ -234,7 +242,6 @@ func execute( if err != nil { return err } - // Get context from a caller function. Issue to track: https://github.com/kanisterio/kanister/issues/1930 return exec.StreamWithContext(ctx, remotecommand.StreamOptions{ Stdin: stdin, Stdout: stdout, diff --git a/pkg/kube/exec_test.go b/pkg/kube/exec_test.go index 9242abb4ec..374261a171 100644 --- a/pkg/kube/exec_test.go +++ b/pkg/kube/exec_test.go @@ -15,7 +15,7 @@ //go:build !unit // +build !unit -package kube +package kube_test import ( "bytes"