From 847e57a1add81df208cc3ce03104c4eaacc0c3bd Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Wed, 23 Aug 2023 14:01:11 -0700 Subject: [PATCH 1/8] [pkg/receiver/jobreceiver] Add command package Adds a new jobreceiver/command package that encapsulates the handling of executing commands. Mostly borrowed from sensu-go's robust commands package used by it's multi-platform agents. Signed-off-by: Christian Kruse --- pkg/receiver/jobreceiver/command/buffer.go | 25 +++ pkg/receiver/jobreceiver/command/command.go | 207 ++++++++++++++++++ .../jobreceiver/command/command_test.go | 152 +++++++++++++ pkg/receiver/jobreceiver/command/doc.go | 8 + .../jobreceiver/command/proc_group_linux.go | 11 + .../jobreceiver/command/proc_group_unix.go | 14 ++ .../jobreceiver/command/proc_group_windows.go | 11 + pkg/receiver/jobreceiver/command/proc_unix.go | 20 ++ .../jobreceiver/command/proc_windows.go | 60 +++++ .../jobreceiver/output/consumer/consumer.go | 11 +- pkg/receiver/jobreceiver/runner.go | 38 +++- pkg/receiver/jobreceiver/testdata/usage.yaml | 20 +- 12 files changed, 556 insertions(+), 21 deletions(-) create mode 100644 pkg/receiver/jobreceiver/command/buffer.go create mode 100644 pkg/receiver/jobreceiver/command/command.go create mode 100644 pkg/receiver/jobreceiver/command/command_test.go create mode 100644 pkg/receiver/jobreceiver/command/doc.go create mode 100644 pkg/receiver/jobreceiver/command/proc_group_linux.go create mode 100644 pkg/receiver/jobreceiver/command/proc_group_unix.go create mode 100644 pkg/receiver/jobreceiver/command/proc_group_windows.go create mode 100644 pkg/receiver/jobreceiver/command/proc_unix.go create mode 100644 pkg/receiver/jobreceiver/command/proc_windows.go diff --git a/pkg/receiver/jobreceiver/command/buffer.go b/pkg/receiver/jobreceiver/command/buffer.go new file mode 100644 index 0000000000..6e592a56b0 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/buffer.go @@ -0,0 +1,25 @@ +package command + +import ( + "bytes" + "sync" +) + +// SyncBuffer can be used to buffer both output streams to +// in a monitoring plugin spec compliant way. +type SyncBuffer struct { + buf bytes.Buffer + mu sync.Mutex +} + +func (s *SyncBuffer) Write(p []byte) (n int, err error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.Write(p) +} + +func (s *SyncBuffer) String() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.String() +} diff --git a/pkg/receiver/jobreceiver/command/command.go b/pkg/receiver/jobreceiver/command/command.go new file mode 100644 index 0000000000..ea4d2bbe4e --- /dev/null +++ b/pkg/receiver/jobreceiver/command/command.go @@ -0,0 +1,207 @@ +package command + +import ( + "context" + "fmt" + "io" + "math" + "os" + "os/exec" + "sync" + "syscall" + "time" +) + +const ( + // TimeoutOutput specifies the command execution output in the + // event of an execution timeout. + TimeoutOutput string = "Execution timed out\n" + + // OKExitStatus specifies the command execution exit status + // that indicates a success, A-OK. + OKExitStatus int = 0 + + // TimeoutExitStatus specifies the command execution exit + // status in the event of an execution timeout. + TimeoutExitStatus int = 2 + + // FallbackExitStatus specifies the command execution exit + // status used when golang is unable to determine the exit + // status. + FallbackExitStatus int = 3 + + // ErrAlreadyExecuted don't do that. + ErrAlreadyExecuted = cError("invocation has already been started once") +) + +// ExecutionRequest provides information about a system command execution, +// somewhat of an abstraction intended to be used for monitoring jobs +type ExecutionRequest struct { + // Command is the command to be executed. + Command string + + // Arguments to execute the command with. + Arguments []string + + // Env ... + Env []string + + // Execution timeout + Timeout time.Duration +} + +type Invocation interface { + // Stdout stream. Must by closed by the caller. + Stdout() io.ReadCloser + // Stderr stream. Must by closed by the caller. + Stderr() io.ReadCloser + // Run the command. Only valid once + Run(context.Context) (ExecutionResponse, error) +} + +// NewInvocation based on an ExecutionRequest +func NewInvocation(request ExecutionRequest) (Invocation, error) { + c := &invocation{ExecutionRequest: request} + var err error + c.stdoutReader, c.stdoutWritter, err = os.Pipe() + if err != nil { + return c, err + } + c.stderrReader, c.stderrWritter, err = os.Pipe() + return c, err +} + +type invocation struct { + ExecutionRequest + + stdoutWritter io.WriteCloser + stderrWritter io.WriteCloser + stdoutReader io.ReadCloser + stderrReader io.ReadCloser + + mu sync.Mutex + started bool +} + +func (c *invocation) Stdout() io.ReadCloser { + return c.stdoutReader +} +func (c *invocation) Stderr() io.ReadCloser { + return c.stderrReader +} +func (c *invocation) Run(ctx context.Context) (ExecutionResponse, error) { + resp := ExecutionResponse{} + + c.mu.Lock() + if c.started { + defer c.mu.Unlock() + return resp, ErrAlreadyExecuted + } + c.started = true + c.mu.Unlock() + + // Using a platform specific shell to "cheat", as the shell + // will handle certain failures for us, where golang exec is + // known to have troubles, e.g. command not found. We still + // use a fallback exit status in the unlikely event that the + // exit status cannot be determined. + var cmd *exec.Cmd + + // Use context.WithCancel for command execution timeout. + // context.WithTimeout will not kill child/grandchild processes + // (see issues tagged in https://github.com/sensu/sensu-go/issues/781). + // Rather, we will use a timer, CancelFunc and proc functions + // to perform full cleanup. + ctx, timeout := context.WithCancel(ctx) + defer timeout() + + // Taken from Sensu-Spawn (Sensu 1.x.x). + cmd = command(ctx, c.Command, c.Arguments) + + // Set the ENV for the command if it is set + if len(c.Env) > 0 { + cmd.Env = c.Env + } + + cmd.Stdout = c.stdoutWritter + cmd.Stderr = c.stderrWritter + defer func() { + c.stdoutWritter.Close() + c.stderrWritter.Close() + }() + + started := time.Now() + defer func() { + resp.Duration = time.Since(started) + }() + + timer := time.NewTimer(math.MaxInt64) + defer timer.Stop() + if c.Timeout > 0 { + setProcessGroup(cmd) + timer.Stop() + timer = time.NewTimer(c.Timeout) + } + if err := cmd.Start(); err != nil { + // Something unexpected happened when attempting to + // fork/exec, return immediately. + return resp, err + } + + waitCh := make(chan struct{}) + var err error + go func() { + err = cmd.Wait() + close(waitCh) + }() + + // Wait for the process to complete or the timer to trigger, whichever comes first. + var killErr error + select { + case <-waitCh: + if err != nil { + // The command most likely return a non-zero exit status. + if exitError, ok := err.(*exec.ExitError); ok { + // Best effort to determine the exit status, this + // should work on Linux, OSX, and Windows. + if status, ok := exitError.Sys().(syscall.WaitStatus); ok { + resp.Status = status.ExitStatus() + } else { + resp.Status = FallbackExitStatus + } + } else { + resp.Status = FallbackExitStatus + } + } else { + // Everything is A-OK. + resp.Status = OKExitStatus + } + + case <-timer.C: + var killErrOutput string + if killErr = killProcess(cmd); killErr != nil { + killErrOutput = fmt.Sprintf("Unable to TERM/KILL the process: #%d\n", cmd.Process.Pid) + } + timeout() + fmt.Fprintf(c.stderrWritter, "%s%s", TimeoutOutput, killErrOutput) + resp.Status = TimeoutExitStatus + } + + return resp, nil +} + +// ExecutionResponse provides the response information of an ExecutionRequest. +type ExecutionResponse struct { + // Command execution exit status. + Status int + + // Duration provides command execution time. + Duration time.Duration +} + +// cError const error type for sentinels +type cError string + +func (e cError) Error() string { + return string(e) +} diff --git a/pkg/receiver/jobreceiver/command/command_test.go b/pkg/receiver/jobreceiver/command/command_test.go new file mode 100644 index 0000000000..803c6fe5b8 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/command_test.go @@ -0,0 +1,152 @@ +package command + +import ( + "context" + "fmt" + "io" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestHelperProcess is a target that helps this suite emulate +// platform specific executables in a way that is platform agnostic. +func TestHelperProcess(t *testing.T) { + if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { + return + } + var args []string + command := os.Args[3] + if len(os.Args) > 4 { + args = os.Args[4:] + } + + switch command { + case "echo": + fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " ")) + case "exit": + if i, err := strconv.ParseInt(args[0], 10, 32); err == nil { + os.Exit(int(i)) + } + panic("unexpected exit argument") + case "sleep": + if d, err := time.ParseDuration(args[0]); err == nil { + time.Sleep(d) + return + } + if i, err := strconv.ParseInt(args[0], 10, 64); err == nil { + time.Sleep(time.Second * time.Duration(i)) + return + } + } + +} + +//nolint:all +func TestExecute(t *testing.T) { + + ctx := context.Background() + // Basic command execution + echo := fakeCommand(t, "echo", "hello", "world") + outC := eventualOutput(echo) + resp, err := echo.Run(ctx) + require.NoError(t, err) + assert.Equal(t, 0, resp.Status) + require.NoError(t, err) + assert.Contains(t, <-outC, "hello world") + + // Command exits non-zero + exitCmd := fakeCommand(t, "exit", "1") + eventualOutput(exitCmd) + resp, err = exitCmd.Run(ctx) + require.NoError(t, err) + assert.Equal(t, 1, resp.Status) + exitCmd = fakeCommand(t, "exit", "33") + eventualOutput(exitCmd) + resp, err = exitCmd.Run(ctx) + require.NoError(t, err) + assert.Equal(t, 33, resp.Status) + + // Command canceled by context + timeoutCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100) + sleepCmd := fakeCommand(t, "sleep", "1m") + eventualOutput(sleepCmd) + done := make(chan struct{}) + go func() { + resp, err = sleepCmd.Run(timeoutCtx) + require.NoError(t, err) + close(done) + }() + select { + case <-time.After(time.Second): + t.Errorf("command context expired but was not killed") + case <-done: + // okay + } + cancel() + + // Command exceeds timeout + sleepCmd = fakeCommand(t, "sleep", "1m") + sleepCmd.Timeout = time.Millisecond * 100 + eventualOutput(sleepCmd) + done = make(chan struct{}) + go func() { + resp, err := sleepCmd.Run(ctx) + assert.NoError(t, err) + assert.Equal(t, TimeoutExitStatus, resp.Status) + close(done) + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("command timeout exceeded but was not killed") + case <-done: + // okay + } + + // Invocation cannot be spuriously re-invoked + echo = fakeCommand(t, "echo", "hello", "world") + outC = eventualOutput(echo) + echo.Run(ctx) + _, err = echo.Run(ctx) + assert.ErrorIs(t, ErrAlreadyExecuted, err) + assert.Contains(t, <-outC, "hello world") +} + +// fakeCommand takes a command and (optionally) command args and will execute +// the TestHelperProcess test within the package FakeCommand is called from. +func fakeCommand(t *testing.T, command string, args ...string) *invocation { + cargs := []string{"-test.run=TestHelperProcess", "--", command} + cargs = append(cargs, args...) + env := []string{"GO_WANT_HELPER_PROCESS=1"} + + execution := ExecutionRequest{ + Command: os.Args[0], + Arguments: cargs, + Env: env, + } + + c, err := NewInvocation(execution) + require.NoError(t, err) + cmd, ok := c.(*invocation) + require.True(t, ok) + return cmd +} + +func eventualOutput(i Invocation) <-chan string { + out := make(chan string, 1) + go func() { + var buf SyncBuffer + defer i.Stdout().Close() + defer i.Stderr().Close() + io.Copy(&buf, i.Stdout()) + io.Copy(&buf, i.Stderr()) + out <- buf.String() + close(out) + }() + return out +} diff --git a/pkg/receiver/jobreceiver/command/doc.go b/pkg/receiver/jobreceiver/command/doc.go new file mode 100644 index 0000000000..0631965fdd --- /dev/null +++ b/pkg/receiver/jobreceiver/command/doc.go @@ -0,0 +1,8 @@ +/* +Package command manages the execution of commands. + +It has been lightly adapted from the `github.com/sensu/sensu-go/command` package +that has been successfully used on the sensu-go's multi-platform agents, intended +to run checks in a Nagios plugin compatible way. +*/ +package command diff --git a/pkg/receiver/jobreceiver/command/proc_group_linux.go b/pkg/receiver/jobreceiver/command/proc_group_linux.go new file mode 100644 index 0000000000..459c3e83e0 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/proc_group_linux.go @@ -0,0 +1,11 @@ +package command + +import ( + "os/exec" + "syscall" +) + +// setProcessGroup sets the process group of the command processprocgroup +func setProcessGroup(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pdeathsig: syscall.SIGTERM} +} diff --git a/pkg/receiver/jobreceiver/command/proc_group_unix.go b/pkg/receiver/jobreceiver/command/proc_group_unix.go new file mode 100644 index 0000000000..54d52b7959 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/proc_group_unix.go @@ -0,0 +1,14 @@ +//go:build !linux && !windows +// +build !linux,!windows + +package command + +import ( + "os/exec" + "syscall" +) + +// setProcessGroup sets the process group of the command process +func setProcessGroup(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} +} diff --git a/pkg/receiver/jobreceiver/command/proc_group_windows.go b/pkg/receiver/jobreceiver/command/proc_group_windows.go new file mode 100644 index 0000000000..478ddc68b1 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/proc_group_windows.go @@ -0,0 +1,11 @@ +package command + +import ( + "os/exec" + "syscall" +) + +// setProcessGroup sets the process group of the command process +func setProcessGroup(cmd *exec.Cmd) { + cmd.SysProcAttr.CreationFlags = syscall.CREATE_NEW_PROCESS_GROUP +} diff --git a/pkg/receiver/jobreceiver/command/proc_unix.go b/pkg/receiver/jobreceiver/command/proc_unix.go new file mode 100644 index 0000000000..aaa4bac7fb --- /dev/null +++ b/pkg/receiver/jobreceiver/command/proc_unix.go @@ -0,0 +1,20 @@ +//go:build !windows +// +build !windows + +package command + +import ( + "context" + "os/exec" + "syscall" +) + +// command returns a command to execute an executable +func command(ctx context.Context, command string, args []string) *exec.Cmd { + return exec.CommandContext(ctx, command, args...) +} + +// killProcess kills the command process and any child processes +func killProcess(cmd *exec.Cmd) error { + return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) +} diff --git a/pkg/receiver/jobreceiver/command/proc_windows.go b/pkg/receiver/jobreceiver/command/proc_windows.go new file mode 100644 index 0000000000..de7db3b87a --- /dev/null +++ b/pkg/receiver/jobreceiver/command/proc_windows.go @@ -0,0 +1,60 @@ +//go:build windows +// +build windows + +package command + +import ( + "context" + "fmt" + "os" + "os/exec" + "strings" + "syscall" +) + +// command returns a command to execute an executable +func command(ctx context.Context, command string, args []string) *exec.Cmd { + cmd := exec.CommandContext(ctx, "cmd") + cl := append([]string{command}, args...) + cmd.SysProcAttr = &syscall.SysProcAttr{ + // Manually set the command line arguments so they are not escaped + // https://github.com/golang/go/commit/f18a4e9609aac3aa83d40920c12b9b45f9376aea + // http://www.josephspurrier.com/prevent-escaping-exec-command-arguments-in-go/ + CmdLine: strings.Join(cl, " "), + } + return cmd +} + +// killProcess kills the command process and any child processes +func killProcess(cmd *exec.Cmd) error { + process := cmd.Process + if process == nil { + return nil + } + + args := []string{fmt.Sprintf("/T /F /PID %d", process.Pid)} + err := command(context.Background(), "taskkill", args).Run() + if err == nil { + return nil + } + + err = forceKill(process) + if err == nil { + return nil + } + err = process.Signal(os.Kill) + + return fmt.Errorf("could not kill process") +} + +func forceKill(process *os.Process) error { + handle, err := syscall.OpenProcess(syscall.PROCESS_TERMINATE, true, uint32(process.Pid)) + if err != nil { + return err + } + + err = syscall.TerminateProcess(handle, 0) + _ = syscall.CloseHandle(handle) + + return err +} diff --git a/pkg/receiver/jobreceiver/output/consumer/consumer.go b/pkg/receiver/jobreceiver/output/consumer/consumer.go index 79c72bba75..a47003d249 100644 --- a/pkg/receiver/jobreceiver/output/consumer/consumer.go +++ b/pkg/receiver/jobreceiver/output/consumer/consumer.go @@ -13,7 +13,7 @@ import ( // Interface consumes command output and emits telemetry data type Interface interface { - Consume(stdin, stderr io.Reader) CloseFunc + Consume(ctx context.Context, stdin, stderr io.ReadCloser) CloseFunc } // CloseFunc @@ -45,10 +45,12 @@ type DemoConsumer struct { } // Consume reads stdout line by line and produces entries -func (p *DemoConsumer) Consume(stdout, _ io.Reader) CloseFunc { - ctx, cancel := context.WithCancel(context.Background()) +func (p *DemoConsumer) Consume(ctx context.Context, stdout, stderr io.ReadCloser) CloseFunc { + ctx, cancel := context.WithCancel(ctx) + stderr.Close() go func() { scanner := bufio.NewScanner(stdout) + defer stdout.Close() for { select { case <-ctx.Done(): @@ -56,6 +58,9 @@ func (p *DemoConsumer) Consume(stdout, _ io.Reader) CloseFunc { default: } if !scanner.Scan() { + if scanner.Err() != nil { + panic(scanner.Err()) + } return } ent, err := p.NewEntry(scanner.Text()) diff --git a/pkg/receiver/jobreceiver/runner.go b/pkg/receiver/jobreceiver/runner.go index 9e9472931d..0257e69fd6 100644 --- a/pkg/receiver/jobreceiver/runner.go +++ b/pkg/receiver/jobreceiver/runner.go @@ -1,10 +1,10 @@ package jobreceiver import ( - "bytes" - "fmt" + "context" "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/builder" + "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/command" "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "go.uber.org/zap" @@ -13,18 +13,44 @@ import ( // Build returns the job runner, the process responsible for scheduling and // running commands and piping their output to the output consumer. func (c Config) Build(logger *zap.SugaredLogger, out consumer.Interface) (builder.JobRunner, error) { - return &stubRunner{Consumer: out}, nil + return &stubRunner{ + Exec: c.Exec, + Consumer: out, + }, nil } // stubRunner is a stub implementation. type stubRunner struct { + Exec ExecutionConfig Consumer consumer.Interface } +// Start stub impl. runs command once at startup then idles indefinitely. func (r *stubRunner) Start(operator.Persister) error { - var buf bytes.Buffer - fmt.Fprint(&buf, "hello world. This is a placeholder message.") - r.Consumer.Consume(&buf, &bytes.Buffer{}) + go func() { + ctx := context.Background() + cmd, err := command.NewInvocation(command.ExecutionRequest{ + Command: r.Exec.Command, + Arguments: r.Exec.Arguments, + Timeout: r.Exec.Timeout, + }) + + if err != nil { + panic(err) + } + cb := r.Consumer.Consume(ctx, cmd.Stdout(), cmd.Stderr()) + + resp, err := cmd.Run(ctx) + if err != nil { + panic(err) + } + cb(consumer.ExecutionSummary{ + Command: r.Exec.Command, + ExitCode: resp.Status, + RunDuration: resp.Duration, + }) + + }() return nil } diff --git a/pkg/receiver/jobreceiver/testdata/usage.yaml b/pkg/receiver/jobreceiver/testdata/usage.yaml index 9963505b18..2f5e904a74 100644 --- a/pkg/receiver/jobreceiver/testdata/usage.yaml +++ b/pkg/receiver/jobreceiver/testdata/usage.yaml @@ -34,9 +34,11 @@ receivers: schedule: interval: 1h exec: - command: echo + command: /bin/sh + timeout: 1m arguments: - - "hello world" + - '-c' + - 'for i in {0..120}; do date -Iseconds --date "90 seconds ago" && sleep 1; done;' output: type: event event: @@ -49,22 +51,16 @@ receivers: resource: bingo: bango operators: - - type: regex_parser + - type: time_parser parse_from: body - regex: '^(?P\w+).*$' - - type: severity_parser - parse_from: attributes.first - mapping: - error: total - warn: - - nonsense - - '.' - - hello + layout_type: gotime + layout: '2006-01-02T15:04:05-07:00' exporters: logging: verbosity: detailed + sampling_initial: 32 processors: memory_limiter: From c407b49db08a8333aeaa1a3830f1cd18a0da31a6 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Tue, 29 Aug 2023 10:05:42 -0700 Subject: [PATCH 2/8] simplify command package Signed-off-by: Christian Kruse --- pkg/receiver/jobreceiver/command/buffer.go | 25 --- pkg/receiver/jobreceiver/command/command.go | 163 +++++--------- .../jobreceiver/command/command_test.go | 211 ++++++++++-------- pkg/receiver/jobreceiver/command/doc.go | 8 - .../jobreceiver/command/proc_group_linux.go | 11 - .../jobreceiver/command/proc_group_unix.go | 14 -- .../jobreceiver/command/proc_group_windows.go | 11 - pkg/receiver/jobreceiver/command/proc_unix.go | 20 -- .../jobreceiver/command/proc_windows.go | 60 ----- .../jobreceiver/output/consumer/consumer.go | 6 +- pkg/receiver/jobreceiver/runner.go | 11 +- 11 files changed, 181 insertions(+), 359 deletions(-) delete mode 100644 pkg/receiver/jobreceiver/command/buffer.go delete mode 100644 pkg/receiver/jobreceiver/command/doc.go delete mode 100644 pkg/receiver/jobreceiver/command/proc_group_linux.go delete mode 100644 pkg/receiver/jobreceiver/command/proc_group_unix.go delete mode 100644 pkg/receiver/jobreceiver/command/proc_group_windows.go delete mode 100644 pkg/receiver/jobreceiver/command/proc_unix.go delete mode 100644 pkg/receiver/jobreceiver/command/proc_windows.go diff --git a/pkg/receiver/jobreceiver/command/buffer.go b/pkg/receiver/jobreceiver/command/buffer.go deleted file mode 100644 index 6e592a56b0..0000000000 --- a/pkg/receiver/jobreceiver/command/buffer.go +++ /dev/null @@ -1,25 +0,0 @@ -package command - -import ( - "bytes" - "sync" -) - -// SyncBuffer can be used to buffer both output streams to -// in a monitoring plugin spec compliant way. -type SyncBuffer struct { - buf bytes.Buffer - mu sync.Mutex -} - -func (s *SyncBuffer) Write(p []byte) (n int, err error) { - s.mu.Lock() - defer s.mu.Unlock() - return s.buf.Write(p) -} - -func (s *SyncBuffer) String() string { - s.mu.Lock() - defer s.mu.Unlock() - return s.buf.String() -} diff --git a/pkg/receiver/jobreceiver/command/command.go b/pkg/receiver/jobreceiver/command/command.go index ea4d2bbe4e..0269798dd4 100644 --- a/pkg/receiver/jobreceiver/command/command.go +++ b/pkg/receiver/jobreceiver/command/command.go @@ -2,10 +2,7 @@ package command import ( "context" - "fmt" "io" - "math" - "os" "os/exec" "sync" "syscall" @@ -13,18 +10,10 @@ import ( ) const ( - // TimeoutOutput specifies the command execution output in the - // event of an execution timeout. - TimeoutOutput string = "Execution timed out\n" - // OKExitStatus specifies the command execution exit status // that indicates a success, A-OK. OKExitStatus int = 0 - // TimeoutExitStatus specifies the command execution exit - // status in the event of an execution timeout. - TimeoutExitStatus int = 2 - // FallbackExitStatus specifies the command execution exit // status used when golang is unable to determine the exit // status. @@ -34,8 +23,7 @@ const ( ErrAlreadyExecuted = cError("invocation has already been started once") ) -// ExecutionRequest provides information about a system command execution, -// somewhat of an abstraction intended to be used for monitoring jobs +// ExecutionRequest type ExecutionRequest struct { // Command is the command to be executed. Command string @@ -43,55 +31,49 @@ type ExecutionRequest struct { // Arguments to execute the command with. Arguments []string - // Env ... + // Env variables to include Env []string // Execution timeout Timeout time.Duration } -type Invocation interface { - // Stdout stream. Must by closed by the caller. - Stdout() io.ReadCloser - // Stderr stream. Must by closed by the caller. - Stderr() io.ReadCloser - // Run the command. Only valid once - Run(context.Context) (ExecutionResponse, error) -} - -// NewInvocation based on an ExecutionRequest -func NewInvocation(request ExecutionRequest) (Invocation, error) { - c := &invocation{ExecutionRequest: request} - var err error - c.stdoutReader, c.stdoutWritter, err = os.Pipe() - if err != nil { - return c, err +// NewExecution based on an ExecutionRequest +func NewExecution(ctx context.Context, request ExecutionRequest) *Execution { + ctx, cancel := context.WithCancel(ctx) + cmd := exec.CommandContext(ctx, request.Command, request.Arguments...) + if len(request.Env) > 0 { + cmd.Env = request.Env + } + return &Execution{ + cmd: cmd, + ctx: ctx, + cancel: cancel, + timeout: request.Timeout, } - c.stderrReader, c.stderrWritter, err = os.Pipe() - return c, err } -type invocation struct { - ExecutionRequest - - stdoutWritter io.WriteCloser - stderrWritter io.WriteCloser - stdoutReader io.ReadCloser - stderrReader io.ReadCloser +type Execution struct { + cmd *exec.Cmd + ctx context.Context + cancel func() + timeout time.Duration mu sync.Mutex started bool } -func (c *invocation) Stdout() io.ReadCloser { - return c.stdoutReader +// Stdout and Stderr return Readers for the underlying execution's output +// streams. Run must be called subsequently or file descriptors may be leaked. +func (c *Execution) Stdout() (io.Reader, error) { + return c.cmd.StdoutPipe() } -func (c *invocation) Stderr() io.ReadCloser { - return c.stderrReader +func (c *Execution) Stderr() (io.Reader, error) { + return c.cmd.StderrPipe() } -func (c *invocation) Run(ctx context.Context) (ExecutionResponse, error) { - resp := ExecutionResponse{} +// Run the command. May only be invoked once. +func (c *Execution) Run() (resp ExecutionResponse, err error) { c.mu.Lock() if c.started { defer c.mu.Unlock() @@ -100,91 +82,43 @@ func (c *invocation) Run(ctx context.Context) (ExecutionResponse, error) { c.started = true c.mu.Unlock() - // Using a platform specific shell to "cheat", as the shell - // will handle certain failures for us, where golang exec is - // known to have troubles, e.g. command not found. We still - // use a fallback exit status in the unlikely event that the - // exit status cannot be determined. - var cmd *exec.Cmd - - // Use context.WithCancel for command execution timeout. - // context.WithTimeout will not kill child/grandchild processes - // (see issues tagged in https://github.com/sensu/sensu-go/issues/781). - // Rather, we will use a timer, CancelFunc and proc functions - // to perform full cleanup. - ctx, timeout := context.WithCancel(ctx) - defer timeout() - - // Taken from Sensu-Spawn (Sensu 1.x.x). - cmd = command(ctx, c.Command, c.Arguments) - - // Set the ENV for the command if it is set - if len(c.Env) > 0 { - cmd.Env = c.Env - } - - cmd.Stdout = c.stdoutWritter - cmd.Stderr = c.stderrWritter - defer func() { - c.stdoutWritter.Close() - c.stderrWritter.Close() - }() + defer c.cancel() started := time.Now() defer func() { resp.Duration = time.Since(started) }() - timer := time.NewTimer(math.MaxInt64) - defer timer.Stop() - if c.Timeout > 0 { - setProcessGroup(cmd) - timer.Stop() - timer = time.NewTimer(c.Timeout) + if c.timeout > 0 { + time.AfterFunc(c.timeout, c.cancel) } - if err := cmd.Start(); err != nil { + if err := c.cmd.Start(); err != nil { // Something unexpected happened when attempting to // fork/exec, return immediately. return resp, err } - waitCh := make(chan struct{}) - var err error - go func() { - err = cmd.Wait() - close(waitCh) - }() - - // Wait for the process to complete or the timer to trigger, whichever comes first. - var killErr error - select { - case <-waitCh: - if err != nil { - // The command most likely return a non-zero exit status. - if exitError, ok := err.(*exec.ExitError); ok { - // Best effort to determine the exit status, this - // should work on Linux, OSX, and Windows. - if status, ok := exitError.Sys().(syscall.WaitStatus); ok { - resp.Status = status.ExitStatus() - } else { - resp.Status = FallbackExitStatus - } + // Wait for the process to complete then attempt to determine the result + err = c.cmd.Wait() + if err != nil { + // The command most likely return a non-zero exit status. + if exitError, ok := err.(*exec.ExitError); ok { + // Best effort to determine the exit status, this + // should work on Linux, OSX, and Windows. + if status, ok := exitError.Sys().(syscall.WaitStatus); ok { + resp.Status = status.ExitStatus() } else { resp.Status = FallbackExitStatus + resp.Error = exitError } } else { - // Everything is A-OK. - resp.Status = OKExitStatus - } - - case <-timer.C: - var killErrOutput string - if killErr = killProcess(cmd); killErr != nil { - killErrOutput = fmt.Sprintf("Unable to TERM/KILL the process: #%d\n", cmd.Process.Pid) + // Probably an I/O error + resp.Status = FallbackExitStatus + resp.Error = err } - timeout() - fmt.Fprintf(c.stderrWritter, "%s%s", TimeoutOutput, killErrOutput) - resp.Status = TimeoutExitStatus + } else { + // Everything is A-OK. + resp.Status = OKExitStatus } return resp, nil @@ -197,6 +131,9 @@ type ExecutionResponse struct { // Duration provides command execution time. Duration time.Duration + + // Error is passed when the outcome of the execution is uncertain + Error error } // cError const error type for sentinels diff --git a/pkg/receiver/jobreceiver/command/command_test.go b/pkg/receiver/jobreceiver/command/command_test.go index 803c6fe5b8..b965f0edb8 100644 --- a/pkg/receiver/jobreceiver/command/command_test.go +++ b/pkg/receiver/jobreceiver/command/command_test.go @@ -1,12 +1,15 @@ package command import ( + "bytes" "context" + "flag" "fmt" "io" "os" "strconv" "strings" + "sync" "testing" "time" @@ -14,18 +17,23 @@ import ( "github.com/stretchr/testify/require" ) -// TestHelperProcess is a target that helps this suite emulate -// platform specific executables in a way that is platform agnostic. -func TestHelperProcess(t *testing.T) { - if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { - return +// TestMain lets the test binary emulate other processes +func TestMain(m *testing.M) { + flag.Parse() + + pid := os.Getpid() + if os.Getenv("GO_EXEC_TEST_PID") == "" { + os.Setenv("GO_EXEC_TEST_PID", strconv.Itoa(pid)) + os.Exit(m.Run()) } - var args []string - command := os.Args[3] - if len(os.Args) > 4 { - args = os.Args[4:] + + args := flag.Args() + if len(args) == 0 { + fmt.Fprintf(os.Stderr, "No command\n") + os.Exit(2) } + command, args := args[0], args[1:] switch command { case "echo": fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " ")) @@ -43,110 +51,133 @@ func TestHelperProcess(t *testing.T) { time.Sleep(time.Second * time.Duration(i)) return } + case "fork": + childCommand := NewExecution(context.Background(), ExecutionRequest{ + Command: os.Args[0], + Arguments: args, + }) + _, err := childCommand.Run() + if err != nil { + fmt.Fprintf(os.Stderr, "fork error: %v", err) + os.Exit(3) + } } - } //nolint:all func TestExecute(t *testing.T) { - ctx := context.Background() // Basic command execution - echo := fakeCommand(t, "echo", "hello", "world") - outC := eventualOutput(echo) - resp, err := echo.Run(ctx) - require.NoError(t, err) - assert.Equal(t, 0, resp.Status) - require.NoError(t, err) - assert.Contains(t, <-outC, "hello world") + t.Run("basic", func(t *testing.T) { + echo := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "echo", Arguments: []string{"hello", "world"}})) + outC := eventualOutput(echo) + resp, err := echo.Run() + require.NoError(t, err) + assert.Equal(t, 0, resp.Status) + require.NoError(t, err) + assert.Contains(t, <-outC, "hello world") + }) // Command exits non-zero - exitCmd := fakeCommand(t, "exit", "1") - eventualOutput(exitCmd) - resp, err = exitCmd.Run(ctx) - require.NoError(t, err) - assert.Equal(t, 1, resp.Status) - exitCmd = fakeCommand(t, "exit", "33") - eventualOutput(exitCmd) - resp, err = exitCmd.Run(ctx) - require.NoError(t, err) - assert.Equal(t, 33, resp.Status) - - // Command canceled by context - timeoutCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100) - sleepCmd := fakeCommand(t, "sleep", "1m") - eventualOutput(sleepCmd) - done := make(chan struct{}) - go func() { - resp, err = sleepCmd.Run(timeoutCtx) + t.Run("exit code 1", func(t *testing.T) { + exitCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "exit", Arguments: []string{"1"}})) + resp, err := exitCmd.Run() require.NoError(t, err) - close(done) - }() - select { - case <-time.After(time.Second): - t.Errorf("command context expired but was not killed") - case <-done: - // okay - } - cancel() + assert.Equal(t, 1, resp.Status) + exitCmd = NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "exit", Arguments: []string{"33"}})) + resp, err = exitCmd.Run() + require.NoError(t, err) + assert.Equal(t, 33, resp.Status) + }) // Command exceeds timeout - sleepCmd = fakeCommand(t, "sleep", "1m") - sleepCmd.Timeout = time.Millisecond * 100 - eventualOutput(sleepCmd) - done = make(chan struct{}) - go func() { - resp, err := sleepCmd.Run(ctx) - assert.NoError(t, err) - assert.Equal(t, TimeoutExitStatus, resp.Status) - close(done) - }() - select { - case <-time.After(5 * time.Second): - t.Errorf("command timeout exceeded but was not killed") - case <-done: - // okay - } + t.Run("exceeds timeout", func(t *testing.T) { + timeout := time.Millisecond * 100 + sleepCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "sleep", Arguments: []string{"1m"}, Timeout: timeout})) + done := make(chan struct{}) + go func() { + resp, err := sleepCmd.Run() + assert.NoError(t, err) + assert.NotEqual(t, OKExitStatus, resp.Status) + assert.LessOrEqual(t, timeout, resp.Duration) + close(done) + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("command timeout exceeded but was not killed") + case <-done: + // okay + } + }) + // Command exceeds timeout with child process + t.Run("exceeds timeout with child", func(t *testing.T) { + timeout := time.Millisecond * 100 + sleepCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "fork", Arguments: []string{"sleep", "1m"}, Timeout: timeout})) + done := make(chan struct{}) + go func() { + resp, err := sleepCmd.Run() + assert.NoError(t, err) + assert.NotEqual(t, OKExitStatus, resp.Status) + assert.LessOrEqual(t, timeout, resp.Duration) + close(done) + }() + select { + case <-time.After(5 * time.Second): + t.Fatal("command timeout exceeded but was not killed") + case <-done: + // okay + } + }) // Invocation cannot be spuriously re-invoked - echo = fakeCommand(t, "echo", "hello", "world") - outC = eventualOutput(echo) - echo.Run(ctx) - _, err = echo.Run(ctx) - assert.ErrorIs(t, ErrAlreadyExecuted, err) - assert.Contains(t, <-outC, "hello world") + t.Run("cannot be ran twice", func(t *testing.T) { + echo := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "echo", Arguments: []string{"hello", "world"}})) + outC := eventualOutput(echo) + echo.Run() + _, err := echo.Run() + assert.ErrorIs(t, ErrAlreadyExecuted, err) + assert.Contains(t, <-outC, "hello world") + }) } -// fakeCommand takes a command and (optionally) command args and will execute -// the TestHelperProcess test within the package FakeCommand is called from. -func fakeCommand(t *testing.T, command string, args ...string) *invocation { - cargs := []string{"-test.run=TestHelperProcess", "--", command} - cargs = append(cargs, args...) - env := []string{"GO_WANT_HELPER_PROCESS=1"} - - execution := ExecutionRequest{ - Command: os.Args[0], - Arguments: cargs, - Env: env, - } +// withTestHelper takes an ExecutionRequest and adjusts it to run with the +// test binary. TestMain will handle emulating the command. +func withTestHelper(t *testing.T, request ExecutionRequest) ExecutionRequest { + t.Helper() - c, err := NewInvocation(execution) - require.NoError(t, err) - cmd, ok := c.(*invocation) - require.True(t, ok) - return cmd + request.Arguments = append([]string{request.Command}, request.Arguments...) + request.Command = os.Args[0] + return request } -func eventualOutput(i Invocation) <-chan string { +//nolint:all +func eventualOutput(i *Execution) <-chan string { out := make(chan string, 1) + stdout, _ := i.Stdout() + stderr, _ := i.Stderr() go func() { - var buf SyncBuffer - defer i.Stdout().Close() - defer i.Stderr().Close() - io.Copy(&buf, i.Stdout()) - io.Copy(&buf, i.Stderr()) + var buf syncBuffer + io.Copy(&buf, stdout) + io.Copy(&buf, stderr) out <- buf.String() close(out) }() return out } + +type syncBuffer struct { + buf bytes.Buffer + mu sync.Mutex +} + +func (s *syncBuffer) Write(p []byte) (n int, err error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.Write(p) +} + +func (s *syncBuffer) String() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.String() +} diff --git a/pkg/receiver/jobreceiver/command/doc.go b/pkg/receiver/jobreceiver/command/doc.go deleted file mode 100644 index 0631965fdd..0000000000 --- a/pkg/receiver/jobreceiver/command/doc.go +++ /dev/null @@ -1,8 +0,0 @@ -/* -Package command manages the execution of commands. - -It has been lightly adapted from the `github.com/sensu/sensu-go/command` package -that has been successfully used on the sensu-go's multi-platform agents, intended -to run checks in a Nagios plugin compatible way. -*/ -package command diff --git a/pkg/receiver/jobreceiver/command/proc_group_linux.go b/pkg/receiver/jobreceiver/command/proc_group_linux.go deleted file mode 100644 index 459c3e83e0..0000000000 --- a/pkg/receiver/jobreceiver/command/proc_group_linux.go +++ /dev/null @@ -1,11 +0,0 @@ -package command - -import ( - "os/exec" - "syscall" -) - -// setProcessGroup sets the process group of the command processprocgroup -func setProcessGroup(cmd *exec.Cmd) { - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pdeathsig: syscall.SIGTERM} -} diff --git a/pkg/receiver/jobreceiver/command/proc_group_unix.go b/pkg/receiver/jobreceiver/command/proc_group_unix.go deleted file mode 100644 index 54d52b7959..0000000000 --- a/pkg/receiver/jobreceiver/command/proc_group_unix.go +++ /dev/null @@ -1,14 +0,0 @@ -//go:build !linux && !windows -// +build !linux,!windows - -package command - -import ( - "os/exec" - "syscall" -) - -// setProcessGroup sets the process group of the command process -func setProcessGroup(cmd *exec.Cmd) { - cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} -} diff --git a/pkg/receiver/jobreceiver/command/proc_group_windows.go b/pkg/receiver/jobreceiver/command/proc_group_windows.go deleted file mode 100644 index 478ddc68b1..0000000000 --- a/pkg/receiver/jobreceiver/command/proc_group_windows.go +++ /dev/null @@ -1,11 +0,0 @@ -package command - -import ( - "os/exec" - "syscall" -) - -// setProcessGroup sets the process group of the command process -func setProcessGroup(cmd *exec.Cmd) { - cmd.SysProcAttr.CreationFlags = syscall.CREATE_NEW_PROCESS_GROUP -} diff --git a/pkg/receiver/jobreceiver/command/proc_unix.go b/pkg/receiver/jobreceiver/command/proc_unix.go deleted file mode 100644 index aaa4bac7fb..0000000000 --- a/pkg/receiver/jobreceiver/command/proc_unix.go +++ /dev/null @@ -1,20 +0,0 @@ -//go:build !windows -// +build !windows - -package command - -import ( - "context" - "os/exec" - "syscall" -) - -// command returns a command to execute an executable -func command(ctx context.Context, command string, args []string) *exec.Cmd { - return exec.CommandContext(ctx, command, args...) -} - -// killProcess kills the command process and any child processes -func killProcess(cmd *exec.Cmd) error { - return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) -} diff --git a/pkg/receiver/jobreceiver/command/proc_windows.go b/pkg/receiver/jobreceiver/command/proc_windows.go deleted file mode 100644 index de7db3b87a..0000000000 --- a/pkg/receiver/jobreceiver/command/proc_windows.go +++ /dev/null @@ -1,60 +0,0 @@ -//go:build windows -// +build windows - -package command - -import ( - "context" - "fmt" - "os" - "os/exec" - "strings" - "syscall" -) - -// command returns a command to execute an executable -func command(ctx context.Context, command string, args []string) *exec.Cmd { - cmd := exec.CommandContext(ctx, "cmd") - cl := append([]string{command}, args...) - cmd.SysProcAttr = &syscall.SysProcAttr{ - // Manually set the command line arguments so they are not escaped - // https://github.com/golang/go/commit/f18a4e9609aac3aa83d40920c12b9b45f9376aea - // http://www.josephspurrier.com/prevent-escaping-exec-command-arguments-in-go/ - CmdLine: strings.Join(cl, " "), - } - return cmd -} - -// killProcess kills the command process and any child processes -func killProcess(cmd *exec.Cmd) error { - process := cmd.Process - if process == nil { - return nil - } - - args := []string{fmt.Sprintf("/T /F /PID %d", process.Pid)} - err := command(context.Background(), "taskkill", args).Run() - if err == nil { - return nil - } - - err = forceKill(process) - if err == nil { - return nil - } - err = process.Signal(os.Kill) - - return fmt.Errorf("could not kill process") -} - -func forceKill(process *os.Process) error { - handle, err := syscall.OpenProcess(syscall.PROCESS_TERMINATE, true, uint32(process.Pid)) - if err != nil { - return err - } - - err = syscall.TerminateProcess(handle, 0) - _ = syscall.CloseHandle(handle) - - return err -} diff --git a/pkg/receiver/jobreceiver/output/consumer/consumer.go b/pkg/receiver/jobreceiver/output/consumer/consumer.go index a47003d249..8ce91f9be6 100644 --- a/pkg/receiver/jobreceiver/output/consumer/consumer.go +++ b/pkg/receiver/jobreceiver/output/consumer/consumer.go @@ -13,7 +13,7 @@ import ( // Interface consumes command output and emits telemetry data type Interface interface { - Consume(ctx context.Context, stdin, stderr io.ReadCloser) CloseFunc + Consume(ctx context.Context, stdin, stderr io.Reader) CloseFunc } // CloseFunc @@ -45,12 +45,10 @@ type DemoConsumer struct { } // Consume reads stdout line by line and produces entries -func (p *DemoConsumer) Consume(ctx context.Context, stdout, stderr io.ReadCloser) CloseFunc { +func (p *DemoConsumer) Consume(ctx context.Context, stdout, stderr io.Reader) CloseFunc { ctx, cancel := context.WithCancel(ctx) - stderr.Close() go func() { scanner := bufio.NewScanner(stdout) - defer stdout.Close() for { select { case <-ctx.Done(): diff --git a/pkg/receiver/jobreceiver/runner.go b/pkg/receiver/jobreceiver/runner.go index 0257e69fd6..269afdae53 100644 --- a/pkg/receiver/jobreceiver/runner.go +++ b/pkg/receiver/jobreceiver/runner.go @@ -29,18 +29,23 @@ type stubRunner struct { func (r *stubRunner) Start(operator.Persister) error { go func() { ctx := context.Background() - cmd, err := command.NewInvocation(command.ExecutionRequest{ + cmd := command.NewExecution(ctx, command.ExecutionRequest{ Command: r.Exec.Command, Arguments: r.Exec.Arguments, Timeout: r.Exec.Timeout, }) + stdout, err := cmd.Stdout() if err != nil { panic(err) } - cb := r.Consumer.Consume(ctx, cmd.Stdout(), cmd.Stderr()) + stderr, err := cmd.Stderr() + if err != nil { + panic(err) + } + cb := r.Consumer.Consume(ctx, stdout, stderr) - resp, err := cmd.Run(ctx) + resp, err := cmd.Run() if err != nil { panic(err) } From cb4e0990b9d5b33aec16d9fbb764ac82190732ff Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Thu, 7 Sep 2023 12:35:35 -0700 Subject: [PATCH 3/8] remove superfluous mutex Signed-off-by: Christian Kruse --- pkg/receiver/jobreceiver/command/command.go | 15 --------------- pkg/receiver/jobreceiver/command/command_test.go | 2 +- 2 files changed, 1 insertion(+), 16 deletions(-) diff --git a/pkg/receiver/jobreceiver/command/command.go b/pkg/receiver/jobreceiver/command/command.go index 0269798dd4..927371df0c 100644 --- a/pkg/receiver/jobreceiver/command/command.go +++ b/pkg/receiver/jobreceiver/command/command.go @@ -4,7 +4,6 @@ import ( "context" "io" "os/exec" - "sync" "syscall" "time" ) @@ -18,9 +17,6 @@ const ( // status used when golang is unable to determine the exit // status. FallbackExitStatus int = 3 - - // ErrAlreadyExecuted don't do that. - ErrAlreadyExecuted = cError("invocation has already been started once") ) // ExecutionRequest @@ -58,9 +54,6 @@ type Execution struct { ctx context.Context cancel func() timeout time.Duration - - mu sync.Mutex - started bool } // Stdout and Stderr return Readers for the underlying execution's output @@ -74,14 +67,6 @@ func (c *Execution) Stderr() (io.Reader, error) { // Run the command. May only be invoked once. func (c *Execution) Run() (resp ExecutionResponse, err error) { - c.mu.Lock() - if c.started { - defer c.mu.Unlock() - return resp, ErrAlreadyExecuted - } - c.started = true - c.mu.Unlock() - defer c.cancel() started := time.Now() diff --git a/pkg/receiver/jobreceiver/command/command_test.go b/pkg/receiver/jobreceiver/command/command_test.go index b965f0edb8..775055b2e2 100644 --- a/pkg/receiver/jobreceiver/command/command_test.go +++ b/pkg/receiver/jobreceiver/command/command_test.go @@ -135,7 +135,7 @@ func TestExecute(t *testing.T) { outC := eventualOutput(echo) echo.Run() _, err := echo.Run() - assert.ErrorIs(t, ErrAlreadyExecuted, err) + assert.Error(t, err) assert.Contains(t, <-outC, "hello world") }) } From 98c5bea91950ca93d46e0b39a4380996defb56f8 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Thu, 7 Sep 2023 13:31:20 -0700 Subject: [PATCH 4/8] Change OS specific command behavior defaults Signed-off-by: Christian Kruse --- pkg/receiver/jobreceiver/command/command.go | 9 ++----- .../jobreceiver/command/exec_linux.go | 15 +++++++++++ pkg/receiver/jobreceiver/command/exec_unix.go | 18 +++++++++++++ .../jobreceiver/command/exec_windows.go | 26 +++++++++++++++++++ 4 files changed, 61 insertions(+), 7 deletions(-) create mode 100644 pkg/receiver/jobreceiver/command/exec_linux.go create mode 100644 pkg/receiver/jobreceiver/command/exec_unix.go create mode 100644 pkg/receiver/jobreceiver/command/exec_windows.go diff --git a/pkg/receiver/jobreceiver/command/command.go b/pkg/receiver/jobreceiver/command/command.go index 927371df0c..7eacb7b4c0 100644 --- a/pkg/receiver/jobreceiver/command/command.go +++ b/pkg/receiver/jobreceiver/command/command.go @@ -41,6 +41,8 @@ func NewExecution(ctx context.Context, request ExecutionRequest) *Execution { if len(request.Env) > 0 { cmd.Env = request.Env } + + setOptions(cmd) return &Execution{ cmd: cmd, ctx: ctx, @@ -120,10 +122,3 @@ type ExecutionResponse struct { // Error is passed when the outcome of the execution is uncertain Error error } - -// cError const error type for sentinels -type cError string - -func (e cError) Error() string { - return string(e) -} diff --git a/pkg/receiver/jobreceiver/command/exec_linux.go b/pkg/receiver/jobreceiver/command/exec_linux.go new file mode 100644 index 0000000000..1da434940f --- /dev/null +++ b/pkg/receiver/jobreceiver/command/exec_linux.go @@ -0,0 +1,15 @@ +package command + +import ( + "os/exec" + "syscall" +) + +// setOptions sets the process group of the command processprocgroup +func setOptions(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pdeathsig: syscall.SIGTERM} + cmd.Cancel = func() error { + // Kill process group instead + return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + } +} diff --git a/pkg/receiver/jobreceiver/command/exec_unix.go b/pkg/receiver/jobreceiver/command/exec_unix.go new file mode 100644 index 0000000000..7f2b4ae670 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/exec_unix.go @@ -0,0 +1,18 @@ +//go:build !linux && !windows +// +build !linux,!windows + +package command + +import ( + "os/exec" + "syscall" +) + +// setOptions sets the process group of the command process +func setOptions(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + cmd.Cancel = func() error { + // Kill process group instead + return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + } +} diff --git a/pkg/receiver/jobreceiver/command/exec_windows.go b/pkg/receiver/jobreceiver/command/exec_windows.go new file mode 100644 index 0000000000..28977a8787 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/exec_windows.go @@ -0,0 +1,26 @@ +package command + +import ( + "fmt" + "os/exec" + "syscall" +) + +// setOptions sets the process group of the command process +func setOptions(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{ + CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, + } + cmd.Cancel = func() error { + // Try with taskkill first + taskkill := exec.Command( + "taskkill", + "/T", "/F", "/PID", fmt.Sprint(cmd.Process.Pid), + ) + if err := taskkill.Run(); err == nil { + return nil + } + // Fall back to the default behavior + return cmd.Process.Kill() + } +} From 234b50527a050b55b8375995c36927ac0246e493 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Thu, 7 Sep 2023 16:29:44 -0700 Subject: [PATCH 5/8] annotate Signed-off-by: Christian Kruse --- pkg/receiver/jobreceiver/command/exec_windows.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/receiver/jobreceiver/command/exec_windows.go b/pkg/receiver/jobreceiver/command/exec_windows.go index 28977a8787..844a5ac3d9 100644 --- a/pkg/receiver/jobreceiver/command/exec_windows.go +++ b/pkg/receiver/jobreceiver/command/exec_windows.go @@ -11,6 +11,8 @@ func setOptions(cmd *exec.Cmd) { cmd.SysProcAttr = &syscall.SysProcAttr{ CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, } + // Cancel functiona adapted from sensu-go's command package + // TODO(ck) may be worth looking into the windows Job Object api cmd.Cancel = func() error { // Try with taskkill first taskkill := exec.Command( From 65e91d1428c3623bf23e88e754f932b301a082f0 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Thu, 7 Sep 2023 16:51:51 -0700 Subject: [PATCH 6/8] remove irrelevant system attribute flag Signed-off-by: Christian Kruse --- pkg/receiver/jobreceiver/command/exec_windows.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/receiver/jobreceiver/command/exec_windows.go b/pkg/receiver/jobreceiver/command/exec_windows.go index 844a5ac3d9..1ad8ea37f9 100644 --- a/pkg/receiver/jobreceiver/command/exec_windows.go +++ b/pkg/receiver/jobreceiver/command/exec_windows.go @@ -3,14 +3,10 @@ package command import ( "fmt" "os/exec" - "syscall" ) // setOptions sets the process group of the command process func setOptions(cmd *exec.Cmd) { - cmd.SysProcAttr = &syscall.SysProcAttr{ - CreationFlags: syscall.CREATE_NEW_PROCESS_GROUP, - } // Cancel functiona adapted from sensu-go's command package // TODO(ck) may be worth looking into the windows Job Object api cmd.Cancel = func() error { From c3ca4c7bc59d43ccde373ed08c2502dc84256688 Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Mon, 11 Sep 2023 14:01:20 -0700 Subject: [PATCH 7/8] improved ExecutionRequest godocs Signed-off-by: Christian Kruse --- pkg/receiver/jobreceiver/command/command.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/receiver/jobreceiver/command/command.go b/pkg/receiver/jobreceiver/command/command.go index 7eacb7b4c0..0865c14ff6 100644 --- a/pkg/receiver/jobreceiver/command/command.go +++ b/pkg/receiver/jobreceiver/command/command.go @@ -19,9 +19,10 @@ const ( FallbackExitStatus int = 3 ) -// ExecutionRequest +// ExecutionRequest describes an external command to be executed. type ExecutionRequest struct { - // Command is the command to be executed. + // Command is the command to be executed. This is the only field that must + // be set non-zero. Behaves like os/exec.Cmd's Path field. Command string // Arguments to execute the command with. @@ -30,7 +31,8 @@ type ExecutionRequest struct { // Env variables to include Env []string - // Execution timeout + // Timeout when set non-zero functions as a timer for starting and running + // a command on Execution.Run Timeout time.Duration } From 1a90798106b8b1e2d313b26ae5980057920ae89e Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Fri, 22 Sep 2023 08:24:06 -0700 Subject: [PATCH 8/8] fixes from review Signed-off-by: Christian Kruse --- .../jobreceiver/command/command_test.go | 44 +++++++------------ 1 file changed, 15 insertions(+), 29 deletions(-) diff --git a/pkg/receiver/jobreceiver/command/command_test.go b/pkg/receiver/jobreceiver/command/command_test.go index 775055b2e2..2eb99c3c53 100644 --- a/pkg/receiver/jobreceiver/command/command_test.go +++ b/pkg/receiver/jobreceiver/command/command_test.go @@ -9,7 +9,6 @@ import ( "os" "strconv" "strings" - "sync" "testing" "time" @@ -64,13 +63,12 @@ func TestMain(m *testing.M) { } } -//nolint:all func TestExecute(t *testing.T) { ctx := context.Background() // Basic command execution t.Run("basic", func(t *testing.T) { echo := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "echo", Arguments: []string{"hello", "world"}})) - outC := eventualOutput(echo) + outC := eventualOutput(t, echo) resp, err := echo.Run() require.NoError(t, err) assert.Equal(t, 0, resp.Status) @@ -132,9 +130,10 @@ func TestExecute(t *testing.T) { // Invocation cannot be spuriously re-invoked t.Run("cannot be ran twice", func(t *testing.T) { echo := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "echo", Arguments: []string{"hello", "world"}})) - outC := eventualOutput(echo) - echo.Run() + outC := eventualOutput(t, echo) _, err := echo.Run() + require.NoError(t, err) + _, err = echo.Run() assert.Error(t, err) assert.Contains(t, <-outC, "hello world") }) @@ -150,34 +149,21 @@ func withTestHelper(t *testing.T, request ExecutionRequest) ExecutionRequest { return request } -//nolint:all -func eventualOutput(i *Execution) <-chan string { +func eventualOutput(t *testing.T, i *Execution) <-chan string { + t.Helper() out := make(chan string, 1) - stdout, _ := i.Stdout() - stderr, _ := i.Stderr() + stdout, err := i.Stdout() + require.NoError(t, err) + stderr, err := i.Stderr() + require.NoError(t, err) go func() { - var buf syncBuffer - io.Copy(&buf, stdout) - io.Copy(&buf, stderr) + var buf bytes.Buffer + _, err := io.Copy(&buf, stdout) + require.NoError(t, err) + _, err = io.Copy(&buf, stderr) + require.NoError(t, err) out <- buf.String() close(out) }() return out } - -type syncBuffer struct { - buf bytes.Buffer - mu sync.Mutex -} - -func (s *syncBuffer) Write(p []byte) (n int, err error) { - s.mu.Lock() - defer s.mu.Unlock() - return s.buf.Write(p) -} - -func (s *syncBuffer) String() string { - s.mu.Lock() - defer s.mu.Unlock() - return s.buf.String() -}