Skip to content

Commit

Permalink
Refactor execStream
Browse files Browse the repository at this point in the history
  • Loading branch information
tdmanv committed Nov 14, 2024
1 parent 5c7ac49 commit 89afcce
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 47 deletions.
2 changes: 0 additions & 2 deletions pkg/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

_ "k8s.io/client-go/plugin/pkg/client/auth"
)

func newClientConfig() clientcmd.ClientConfig {
Expand Down
95 changes: 51 additions & 44 deletions pkg/kube/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,75 +149,83 @@ func ExecWithOptions(ctx context.Context, kubeCli kubernetes.Interface, options
if err != nil {
return err
}

errCh := execStream(ctx, kubeCli, config, options)
err = <-errCh
if err != nil {
return errkit.Wrap(err, "Failed to exec command in pod")
}

return nil
err = execStreamWithLogTail(ctx, kubeCli, config, options)
return errkit.Wrap(err, "Failed to exec command in pod")
}

func execStream(
func execStreamWithLogTail(
ctx context.Context,
kubeCli kubernetes.Interface,
config *restclient.Config,
options ExecOptions,
) chan error {
const tty = false
req := kubeCli.CoreV1().RESTClient().Post().
Resource("pods").
Name(options.PodName).
Namespace(options.Namespace).
SubResource("exec")

if len(options.ContainerName) != 0 {
req.Param("container", options.ContainerName)
}

) error {
stderrTail := NewLogTail(logTailDefaultLength)
stdoutTail := NewLogTail(logTailDefaultLength)

var stdout io.Writer = stdoutTail
if options.Stdout != nil {
stdout = io.MultiWriter(options.Stdout, stdoutTail)
}
options.Stdout = stdout

var stderr io.Writer = stderrTail
if options.Stderr != nil {
stderr = io.MultiWriter(options.Stderr, stderrTail)
}
options.Stderr = stderr

u, err := requestURL(kubeCli, options)
if err != nil {
return err
}

err = execStream(ctx, u, config, options)
if err != nil {
return NewExecError(err, stdoutTail, stderrTail)
}
return nil
}

func requestURL(kubeCli kubernetes.Interface, options ExecOptions) (*url.URL, error) {
const tty = false
req := kubeCli.CoreV1().RESTClient().Post().
Resource("pods").
Name(options.PodName).
Namespace(options.Namespace).
SubResource("exec")

if len(options.ContainerName) != 0 {
req.Param("container", options.ContainerName)
}

req.VersionedParams(&corev1.PodExecOptions{
Container: options.ContainerName,
Command: options.Command,
Stdin: options.Stdin != nil,
Stdout: stdout != nil,
Stderr: stderr != nil,
Stdout: options.Stdout != nil,
Stderr: options.Stderr != nil,
TTY: tty,
}, scheme.ParameterCodec)
return req.URL(), req.Error()
}

errCh := make(chan error, 1)
go func() {
err := execute(
ctx,
"POST",
req.URL(),
config,
options.Stdin,
stdout,
stderr,
tty)

if err != nil {
err = NewExecError(err, stdoutTail, stderrTail)
}

errCh <- err
}()

return errCh
func execStream(
ctx context.Context,
reqURL *url.URL,
config *restclient.Config,
options ExecOptions,
) error {
const tty = false
err := execute(
ctx,
"POST",
reqURL,
config,
options.Stdin,
options.Stdout,
options.Stderr,
tty)
return err
}

func execute(
Expand All @@ -234,7 +242,6 @@ func execute(
if err != nil {
return err
}
// Get context from a caller function. Issue to track: https://github.com/kanisterio/kanister/issues/1930
return exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kube/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//go:build !unit
// +build !unit

package kube
package kube_test

import (
"bytes"
Expand Down

0 comments on commit 89afcce

Please sign in to comment.