From c407b49db08a8333aeaa1a3830f1cd18a0da31a6 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Tue, 29 Aug 2023 10:05:42 -0700 Subject: [PATCH] simplify command package Signed-off-by: Christian Kruse --- pkg/receiver/jobreceiver/command/buffer.go | 25 --- pkg/receiver/jobreceiver/command/command.go | 163 +++++--------- .../jobreceiver/command/command_test.go | 211 ++++++++++-------- pkg/receiver/jobreceiver/command/doc.go | 8 - .../jobreceiver/command/proc_group_linux.go | 11 - .../jobreceiver/command/proc_group_unix.go | 14 -- .../jobreceiver/command/proc_group_windows.go | 11 - pkg/receiver/jobreceiver/command/proc_unix.go | 20 -- .../jobreceiver/command/proc_windows.go | 60 ----- .../jobreceiver/output/consumer/consumer.go | 6 +- pkg/receiver/jobreceiver/runner.go | 11 +- 11 files changed, 181 insertions(+), 359 deletions(-) delete mode 100644 pkg/receiver/jobreceiver/command/buffer.go delete mode 100644 pkg/receiver/jobreceiver/command/doc.go delete mode 100644 pkg/receiver/jobreceiver/command/proc_group_linux.go delete mode 100644 pkg/receiver/jobreceiver/command/proc_group_unix.go delete mode 100644 pkg/receiver/jobreceiver/command/proc_group_windows.go delete mode 100644 pkg/receiver/jobreceiver/command/proc_unix.go delete mode 100644 pkg/receiver/jobreceiver/command/proc_windows.go diff --git a/pkg/receiver/jobreceiver/command/buffer.go b/pkg/receiver/jobreceiver/command/buffer.go deleted file mode 100644 index 6e592a56b0..0000000000 --- a/pkg/receiver/jobreceiver/command/buffer.go +++ /dev/null @@ -1,25 +0,0 @@ -package command - -import ( - "bytes" - "sync" -) - -// SyncBuffer can be used to buffer both output streams to -// in a monitoring plugin spec compliant way. -type SyncBuffer struct { - buf bytes.Buffer - mu sync.Mutex -} - -func (s *SyncBuffer) Write(p []byte) (n int, err error) { - s.mu.Lock() - defer s.mu.Unlock() - return s.buf.Write(p) -} - -func (s *SyncBuffer) String() string { - s.mu.Lock() - defer s.mu.Unlock() - return s.buf.String() -} diff --git a/pkg/receiver/jobreceiver/command/command.go b/pkg/receiver/jobreceiver/command/command.go index ea4d2bbe4e..0269798dd4 100644 --- a/pkg/receiver/jobreceiver/command/command.go +++ b/pkg/receiver/jobreceiver/command/command.go @@ -2,10 +2,7 @@ package command import ( "context" - "fmt" "io" - "math" - "os" "os/exec" "sync" "syscall" @@ -13,18 +10,10 @@ import ( ) const ( - // TimeoutOutput specifies the command execution output in the - // event of an execution timeout. - TimeoutOutput string = "Execution timed out\n" - // OKExitStatus specifies the command execution exit status // that indicates a success, A-OK. OKExitStatus int = 0 - // TimeoutExitStatus specifies the command execution exit - // status in the event of an execution timeout. - TimeoutExitStatus int = 2 - // FallbackExitStatus specifies the command execution exit // status used when golang is unable to determine the exit // status. @@ -34,8 +23,7 @@ const ( ErrAlreadyExecuted = cError("invocation has already been started once") ) -// ExecutionRequest provides information about a system command execution, -// somewhat of an abstraction intended to be used for monitoring jobs +// ExecutionRequest type ExecutionRequest struct { // Command is the command to be executed. Command string @@ -43,55 +31,49 @@ type ExecutionRequest struct { // Arguments to execute the command with. Arguments []string - // Env ... + // Env variables to include Env []string // Execution timeout Timeout time.Duration } -type Invocation interface { - // Stdout stream. Must by closed by the caller. - Stdout() io.ReadCloser - // Stderr stream. Must by closed by the caller. - Stderr() io.ReadCloser - // Run the command. Only valid once - Run(context.Context) (ExecutionResponse, error) -} - -// NewInvocation based on an ExecutionRequest -func NewInvocation(request ExecutionRequest) (Invocation, error) { - c := &invocation{ExecutionRequest: request} - var err error - c.stdoutReader, c.stdoutWritter, err = os.Pipe() - if err != nil { - return c, err +// NewExecution based on an ExecutionRequest +func NewExecution(ctx context.Context, request ExecutionRequest) *Execution { + ctx, cancel := context.WithCancel(ctx) + cmd := exec.CommandContext(ctx, request.Command, request.Arguments...) + if len(request.Env) > 0 { + cmd.Env = request.Env + } + return &Execution{ + cmd: cmd, + ctx: ctx, + cancel: cancel, + timeout: request.Timeout, } - c.stderrReader, c.stderrWritter, err = os.Pipe() - return c, err } -type invocation struct { - ExecutionRequest - - stdoutWritter io.WriteCloser - stderrWritter io.WriteCloser - stdoutReader io.ReadCloser - stderrReader io.ReadCloser +type Execution struct { + cmd *exec.Cmd + ctx context.Context + cancel func() + timeout time.Duration mu sync.Mutex started bool } -func (c *invocation) Stdout() io.ReadCloser { - return c.stdoutReader +// Stdout and Stderr return Readers for the underlying execution's output +// streams. Run must be called subsequently or file descriptors may be leaked. +func (c *Execution) Stdout() (io.Reader, error) { + return c.cmd.StdoutPipe() } -func (c *invocation) Stderr() io.ReadCloser { - return c.stderrReader +func (c *Execution) Stderr() (io.Reader, error) { + return c.cmd.StderrPipe() } -func (c *invocation) Run(ctx context.Context) (ExecutionResponse, error) { - resp := ExecutionResponse{} +// Run the command. May only be invoked once. +func (c *Execution) Run() (resp ExecutionResponse, err error) { c.mu.Lock() if c.started { defer c.mu.Unlock() @@ -100,91 +82,43 @@ func (c *invocation) Run(ctx context.Context) (ExecutionResponse, error) { c.started = true c.mu.Unlock() - // Using a platform specific shell to "cheat", as the shell - // will handle certain failures for us, where golang exec is - // known to have troubles, e.g. command not found. We still - // use a fallback exit status in the unlikely event that the - // exit status cannot be determined. - var cmd *exec.Cmd - - // Use context.WithCancel for command execution timeout. - // context.WithTimeout will not kill child/grandchild processes - // (see issues tagged in https://github.com/sensu/sensu-go/issues/781). - // Rather, we will use a timer, CancelFunc and proc functions - // to perform full cleanup. - ctx, timeout := context.WithCancel(ctx) - defer timeout() - - // Taken from Sensu-Spawn (Sensu 1.x.x). - cmd = command(ctx, c.Command, c.Arguments) - - // Set the ENV for the command if it is set - if len(c.Env) > 0 { - cmd.Env = c.Env - } - - cmd.Stdout = c.stdoutWritter - cmd.Stderr = c.stderrWritter - defer func() { - c.stdoutWritter.Close() - c.stderrWritter.Close() - }() + defer c.cancel() started := time.Now() defer func() { resp.Duration = time.Since(started) }() - timer := time.NewTimer(math.MaxInt64) - defer timer.Stop() - if c.Timeout > 0 { - setProcessGroup(cmd) - timer.Stop() - timer = time.NewTimer(c.Timeout) + if c.timeout > 0 { + time.AfterFunc(c.timeout, c.cancel) } - if err := cmd.Start(); err != nil { + if err := c.cmd.Start(); err != nil { // Something unexpected happened when attempting to // fork/exec, return immediately. return resp, err } - waitCh := make(chan struct{}) - var err error - go func() { - err = cmd.Wait() - close(waitCh) - }() - - // Wait for the process to complete or the timer to trigger, whichever comes first. - var killErr error - select { - case <-waitCh: - if err != nil { - // The command most likely return a non-zero exit status. - if exitError, ok := err.(*exec.ExitError); ok { - // Best effort to determine the exit status, this - // should work on Linux, OSX, and Windows. - if status, ok := exitError.Sys().(syscall.WaitStatus); ok { - resp.Status = status.ExitStatus() - } else { - resp.Status = FallbackExitStatus - } + // Wait for the process to complete then attempt to determine the result + err = c.cmd.Wait() + if err != nil { + // The command most likely return a non-zero exit status. + if exitError, ok := err.(*exec.ExitError); ok { + // Best effort to determine the exit status, this + // should work on Linux, OSX, and Windows. + if status, ok := exitError.Sys().(syscall.WaitStatus); ok { + resp.Status = status.ExitStatus() } else { resp.Status = FallbackExitStatus + resp.Error = exitError } } else { - // Everything is A-OK. - resp.Status = OKExitStatus - } - - case <-timer.C: - var killErrOutput string - if killErr = killProcess(cmd); killErr != nil { - killErrOutput = fmt.Sprintf("Unable to TERM/KILL the process: #%d\n", cmd.Process.Pid) + // Probably an I/O error + resp.Status = FallbackExitStatus + resp.Error = err } - timeout() - fmt.Fprintf(c.stderrWritter, "%s%s", TimeoutOutput, killErrOutput) - resp.Status = TimeoutExitStatus + } else { + // Everything is A-OK. + resp.Status = OKExitStatus } return resp, nil @@ -197,6 +131,9 @@ type ExecutionResponse struct { // Duration provides command execution time. Duration time.Duration + + // Error is passed when the outcome of the execution is uncertain + Error error } // cError const error type for sentinels diff --git a/pkg/receiver/jobreceiver/command/command_test.go b/pkg/receiver/jobreceiver/command/command_test.go index 803c6fe5b8..b965f0edb8 100644 --- a/pkg/receiver/jobreceiver/command/command_test.go +++ b/pkg/receiver/jobreceiver/command/command_test.go @@ -1,12 +1,15 @@ package command import ( + "bytes" "context" + "flag" "fmt" "io" "os" "strconv" "strings" + "sync" "testing" "time" @@ -14,18 +17,23 @@ import ( "github.com/stretchr/testify/require" ) -// TestHelperProcess is a target that helps this suite emulate -// platform specific executables in a way that is platform agnostic. -func TestHelperProcess(t *testing.T) { - if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { - return +// TestMain lets the test binary emulate other processes +func TestMain(m *testing.M) { + flag.Parse() + + pid := os.Getpid() + if os.Getenv("GO_EXEC_TEST_PID") == "" { + os.Setenv("GO_EXEC_TEST_PID", strconv.Itoa(pid)) + os.Exit(m.Run()) } - var args []string - command := os.Args[3] - if len(os.Args) > 4 { - args = os.Args[4:] + + args := flag.Args() + if len(args) == 0 { + fmt.Fprintf(os.Stderr, "No command\n") + os.Exit(2) } + command, args := args[0], args[1:] switch command { case "echo": fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " ")) @@ -43,110 +51,133 @@ func TestHelperProcess(t *testing.T) { time.Sleep(time.Second * time.Duration(i)) return } + case "fork": + childCommand := NewExecution(context.Background(), ExecutionRequest{ + Command: os.Args[0], + Arguments: args, + }) + _, err := childCommand.Run() + if err != nil { + fmt.Fprintf(os.Stderr, "fork error: %v", err) + os.Exit(3) + } } - } //nolint:all func TestExecute(t *testing.T) { - ctx := context.Background() // Basic command execution - echo := fakeCommand(t, "echo", "hello", "world") - outC := eventualOutput(echo) - resp, err := echo.Run(ctx) - require.NoError(t, err) - assert.Equal(t, 0, resp.Status) - require.NoError(t, err) - assert.Contains(t, <-outC, "hello world") + t.Run("basic", func(t *testing.T) { + echo := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "echo", Arguments: []string{"hello", "world"}})) + outC := eventualOutput(echo) + resp, err := echo.Run() + require.NoError(t, err) + assert.Equal(t, 0, resp.Status) + require.NoError(t, err) + assert.Contains(t, <-outC, "hello world") + }) // Command exits non-zero - exitCmd := fakeCommand(t, "exit", "1") - eventualOutput(exitCmd) - resp, err = exitCmd.Run(ctx) - require.NoError(t, err) - assert.Equal(t, 1, resp.Status) - exitCmd = fakeCommand(t, "exit", "33") - eventualOutput(exitCmd) - resp, err = exitCmd.Run(ctx) - require.NoError(t, err) - assert.Equal(t, 33, resp.Status) - - // Command canceled by context - timeoutCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100) - sleepCmd := fakeCommand(t, "sleep", "1m") - eventualOutput(sleepCmd) - done := make(chan struct{}) - go func() { - resp, err = sleepCmd.Run(timeoutCtx) + t.Run("exit code 1", func(t *testing.T) { + exitCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "exit", Arguments: []string{"1"}})) + resp, err := exitCmd.Run() require.NoError(t, err) - close(done) - }() - select { - case <-time.After(time.Second): - t.Errorf("command context expired but was not killed") - case <-done: - // okay - } - cancel() + assert.Equal(t, 1, resp.Status) + exitCmd = NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "exit", Arguments: []string{"33"}})) + resp, err = exitCmd.Run() + require.NoError(t, err) + assert.Equal(t, 33, resp.Status) + }) // Command exceeds timeout - sleepCmd = fakeCommand(t, "sleep", "1m") - sleepCmd.Timeout = time.Millisecond * 100 - eventualOutput(sleepCmd) - done = make(chan struct{}) - go func() { - resp, err := sleepCmd.Run(ctx) - assert.NoError(t, err) - assert.Equal(t, TimeoutExitStatus, resp.Status) - close(done) - }() - select { - case <-time.After(5 * time.Second): - t.Errorf("command timeout exceeded but was not killed") - case <-done: - // okay - } + t.Run("exceeds timeout", func(t *testing.T) { + timeout := time.Millisecond * 100 + sleepCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "sleep", Arguments: []string{"1m"}, Timeout: timeout})) + done := make(chan struct{}) + go func() { + resp, err := sleepCmd.Run() + assert.NoError(t, err) + assert.NotEqual(t, OKExitStatus, resp.Status) + assert.LessOrEqual(t, timeout, resp.Duration) + close(done) + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("command timeout exceeded but was not killed") + case <-done: + // okay + } + }) + // Command exceeds timeout with child process + t.Run("exceeds timeout with child", func(t *testing.T) { + timeout := time.Millisecond * 100 + sleepCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "fork", Arguments: []string{"sleep", "1m"}, Timeout: timeout})) + done := make(chan struct{}) + go func() { + resp, err := sleepCmd.Run() + assert.NoError(t, err) + assert.NotEqual(t, OKExitStatus, resp.Status) + assert.LessOrEqual(t, timeout, resp.Duration) + close(done) + }() + select { + case <-time.After(5 * time.Second): + t.Fatal("command timeout exceeded but was not killed") + case <-done: + // okay + } + }) // Invocation cannot be spuriously re-invoked - echo = fakeCommand(t, "echo", "hello", "world") - outC = eventualOutput(echo) - echo.Run(ctx) - _, err = echo.Run(ctx) - assert.ErrorIs(t, ErrAlreadyExecuted, err) - assert.Contains(t, <-outC, "hello world") + t.Run("cannot be ran twice", func(t *testing.T) { + echo := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "echo", Arguments: []string{"hello", "world"}})) + outC := eventualOutput(echo) + echo.Run() + _, err := echo.Run() + assert.ErrorIs(t, ErrAlreadyExecuted, err) + assert.Contains(t, <-outC, "hello world") + }) } -// fakeCommand takes a command and (optionally) command args and will execute -// the TestHelperProcess test within the package FakeCommand is called from. -func fakeCommand(t *testing.T, command string, args ...string) *invocation { - cargs := []string{"-test.run=TestHelperProcess", "--", command} - cargs = append(cargs, args...) - env := []string{"GO_WANT_HELPER_PROCESS=1"} - - execution := ExecutionRequest{ - Command: os.Args[0], - Arguments: cargs, - Env: env, - } +// withTestHelper takes an ExecutionRequest and adjusts it to run with the +// test binary. TestMain will handle emulating the command. +func withTestHelper(t *testing.T, request ExecutionRequest) ExecutionRequest { + t.Helper() - c, err := NewInvocation(execution) - require.NoError(t, err) - cmd, ok := c.(*invocation) - require.True(t, ok) - return cmd + request.Arguments = append([]string{request.Command}, request.Arguments...) + request.Command = os.Args[0] + return request } -func eventualOutput(i Invocation) <-chan string { +//nolint:all +func eventualOutput(i *Execution) <-chan string { out := make(chan string, 1) + stdout, _ := i.Stdout() + stderr, _ := i.Stderr() go func() { - var buf SyncBuffer - defer i.Stdout().Close() - defer i.Stderr().Close() - io.Copy(&buf, i.Stdout()) - io.Copy(&buf, i.Stderr()) + var buf syncBuffer + io.Copy(&buf, stdout) + io.Copy(&buf, stderr) out <- buf.String() close(out) }() return out } + +type syncBuffer struct { + buf bytes.Buffer + mu sync.Mutex +} + +func (s *syncBuffer) Write(p []byte) (n int, err error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.Write(p) +} + +func (s *syncBuffer) String() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.String() +} diff --git a/pkg/receiver/jobreceiver/command/doc.go b/pkg/receiver/jobreceiver/command/doc.go deleted file mode 100644 index 0631965fdd..0000000000 --- a/pkg/receiver/jobreceiver/command/doc.go +++ /dev/null @@ -1,8 +0,0 @@ -/* -Package command manages the execution of commands. - -It has been lightly adapted from the `github.com/sensu/sensu-go/command` package -that has been successfully used on the sensu-go's multi-platform agents, intended -to run checks in a Nagios plugin compatible way. -*/ -package command diff --git a/pkg/receiver/jobreceiver/command/proc_group_linux.go b/pkg/receiver/jobreceiver/command/proc_group_linux.go deleted file mode 100644 index 459c3e83e0..0000000000 --- a/pkg/receiver/jobreceiver/command/proc_group_linux.go +++ /dev/null @@ -1,11 +0,0 @@ -package command - -import ( - "os/exec" - "syscall" -) - -// setProcessGroup sets the process group of the command processprocgroup -func setProcessGroup(cmd *exec.Cmd) { - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pdeathsig: syscall.SIGTERM} -} diff --git a/pkg/receiver/jobreceiver/command/proc_group_unix.go b/pkg/receiver/jobreceiver/command/proc_group_unix.go deleted file mode 100644 index 54d52b7959..0000000000 --- a/pkg/receiver/jobreceiver/command/proc_group_unix.go +++ /dev/null @@ -1,14 +0,0 @@ -//go:build !linux && !windows -// +build !linux,!windows - -package command - -import ( - "os/exec" - "syscall" -) - -// setProcessGroup sets the process group of the command process -func setProcessGroup(cmd *exec.Cmd) { - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} -} diff --git a/pkg/receiver/jobreceiver/command/proc_group_windows.go b/pkg/receiver/jobreceiver/command/proc_group_windows.go deleted file mode 100644 index 478ddc68b1..0000000000 --- a/pkg/receiver/jobreceiver/command/proc_group_windows.go +++ /dev/null @@ -1,11 +0,0 @@ -package command - -import ( - "os/exec" - "syscall" -) - -// setProcessGroup sets the process group of the command process -func setProcessGroup(cmd *exec.Cmd) { - cmd.SysProcAttr.CreationFlags = syscall.CREATE_NEW_PROCESS_GROUP -} diff --git a/pkg/receiver/jobreceiver/command/proc_unix.go b/pkg/receiver/jobreceiver/command/proc_unix.go deleted file mode 100644 index aaa4bac7fb..0000000000 --- a/pkg/receiver/jobreceiver/command/proc_unix.go +++ /dev/null @@ -1,20 +0,0 @@ -//go:build !windows -// +build !windows - -package command - -import ( - "context" - "os/exec" - "syscall" -) - -// command returns a command to execute an executable -func command(ctx context.Context, command string, args []string) *exec.Cmd { - return exec.CommandContext(ctx, command, args...) -} - -// killProcess kills the command process and any child processes -func killProcess(cmd *exec.Cmd) error { - return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) -} diff --git a/pkg/receiver/jobreceiver/command/proc_windows.go b/pkg/receiver/jobreceiver/command/proc_windows.go deleted file mode 100644 index de7db3b87a..0000000000 --- a/pkg/receiver/jobreceiver/command/proc_windows.go +++ /dev/null @@ -1,60 +0,0 @@ -//go:build windows -// +build windows - -package command - -import ( - "context" - "fmt" - "os" - "os/exec" - "strings" - "syscall" -) - -// command returns a command to execute an executable -func command(ctx context.Context, command string, args []string) *exec.Cmd { - cmd := exec.CommandContext(ctx, "cmd") - cl := append([]string{command}, args...) - cmd.SysProcAttr = &syscall.SysProcAttr{ - // Manually set the command line arguments so they are not escaped - // https://github.com/golang/go/commit/f18a4e9609aac3aa83d40920c12b9b45f9376aea - // http://www.josephspurrier.com/prevent-escaping-exec-command-arguments-in-go/ - CmdLine: strings.Join(cl, " "), - } - return cmd -} - -// killProcess kills the command process and any child processes -func killProcess(cmd *exec.Cmd) error { - process := cmd.Process - if process == nil { - return nil - } - - args := []string{fmt.Sprintf("/T /F /PID %d", process.Pid)} - err := command(context.Background(), "taskkill", args).Run() - if err == nil { - return nil - } - - err = forceKill(process) - if err == nil { - return nil - } - err = process.Signal(os.Kill) - - return fmt.Errorf("could not kill process") -} - -func forceKill(process *os.Process) error { - handle, err := syscall.OpenProcess(syscall.PROCESS_TERMINATE, true, uint32(process.Pid)) - if err != nil { - return err - } - - err = syscall.TerminateProcess(handle, 0) - _ = syscall.CloseHandle(handle) - - return err -} diff --git a/pkg/receiver/jobreceiver/output/consumer/consumer.go b/pkg/receiver/jobreceiver/output/consumer/consumer.go index a47003d249..8ce91f9be6 100644 --- a/pkg/receiver/jobreceiver/output/consumer/consumer.go +++ b/pkg/receiver/jobreceiver/output/consumer/consumer.go @@ -13,7 +13,7 @@ import ( // Interface consumes command output and emits telemetry data type Interface interface { - Consume(ctx context.Context, stdin, stderr io.ReadCloser) CloseFunc + Consume(ctx context.Context, stdin, stderr io.Reader) CloseFunc } // CloseFunc @@ -45,12 +45,10 @@ type DemoConsumer struct { } // Consume reads stdout line by line and produces entries -func (p *DemoConsumer) Consume(ctx context.Context, stdout, stderr io.ReadCloser) CloseFunc { +func (p *DemoConsumer) Consume(ctx context.Context, stdout, stderr io.Reader) CloseFunc { ctx, cancel := context.WithCancel(ctx) - stderr.Close() go func() { scanner := bufio.NewScanner(stdout) - defer stdout.Close() for { select { case <-ctx.Done(): diff --git a/pkg/receiver/jobreceiver/runner.go b/pkg/receiver/jobreceiver/runner.go index 0257e69fd6..269afdae53 100644 --- a/pkg/receiver/jobreceiver/runner.go +++ b/pkg/receiver/jobreceiver/runner.go @@ -29,18 +29,23 @@ type stubRunner struct { func (r *stubRunner) Start(operator.Persister) error { go func() { ctx := context.Background() - cmd, err := command.NewInvocation(command.ExecutionRequest{ + cmd := command.NewExecution(ctx, command.ExecutionRequest{ Command: r.Exec.Command, Arguments: r.Exec.Arguments, Timeout: r.Exec.Timeout, }) + stdout, err := cmd.Stdout() if err != nil { panic(err) } - cb := r.Consumer.Consume(ctx, cmd.Stdout(), cmd.Stderr()) + stderr, err := cmd.Stderr() + if err != nil { + panic(err) + } + cb := r.Consumer.Consume(ctx, stdout, stderr) - resp, err := cmd.Run(ctx) + resp, err := cmd.Run() if err != nil { panic(err) }