diff --git a/pkg/receiver/jobreceiver/command/buffer.go b/pkg/receiver/jobreceiver/command/buffer.go new file mode 100644 index 0000000000..6e592a56b0 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/buffer.go @@ -0,0 +1,25 @@ +package command + +import ( + "bytes" + "sync" +) + +// SyncBuffer can be used to buffer both output streams to +// in a monitoring plugin spec compliant way. +type SyncBuffer struct { + buf bytes.Buffer + mu sync.Mutex +} + +func (s *SyncBuffer) Write(p []byte) (n int, err error) { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.Write(p) +} + +func (s *SyncBuffer) String() string { + s.mu.Lock() + defer s.mu.Unlock() + return s.buf.String() +} diff --git a/pkg/receiver/jobreceiver/command/command.go b/pkg/receiver/jobreceiver/command/command.go new file mode 100644 index 0000000000..ea4d2bbe4e --- /dev/null +++ b/pkg/receiver/jobreceiver/command/command.go @@ -0,0 +1,207 @@ +package command + +import ( + "context" + "fmt" + "io" + "math" + "os" + "os/exec" + "sync" + "syscall" + "time" +) + +const ( + // TimeoutOutput specifies the command execution output in the + // event of an execution timeout. + TimeoutOutput string = "Execution timed out\n" + + // OKExitStatus specifies the command execution exit status + // that indicates a success, A-OK. + OKExitStatus int = 0 + + // TimeoutExitStatus specifies the command execution exit + // status in the event of an execution timeout. + TimeoutExitStatus int = 2 + + // FallbackExitStatus specifies the command execution exit + // status used when golang is unable to determine the exit + // status. + FallbackExitStatus int = 3 + + // ErrAlreadyExecuted don't do that. + ErrAlreadyExecuted = cError("invocation has already been started once") +) + +// ExecutionRequest provides information about a system command execution, +// somewhat of an abstraction intended to be used for monitoring jobs +type ExecutionRequest struct { + // Command is the command to be executed. + Command string + + // Arguments to execute the command with. + Arguments []string + + // Env ... + Env []string + + // Execution timeout + Timeout time.Duration +} + +type Invocation interface { + // Stdout stream. Must by closed by the caller. + Stdout() io.ReadCloser + // Stderr stream. Must by closed by the caller. + Stderr() io.ReadCloser + // Run the command. Only valid once + Run(context.Context) (ExecutionResponse, error) +} + +// NewInvocation based on an ExecutionRequest +func NewInvocation(request ExecutionRequest) (Invocation, error) { + c := &invocation{ExecutionRequest: request} + var err error + c.stdoutReader, c.stdoutWritter, err = os.Pipe() + if err != nil { + return c, err + } + c.stderrReader, c.stderrWritter, err = os.Pipe() + return c, err +} + +type invocation struct { + ExecutionRequest + + stdoutWritter io.WriteCloser + stderrWritter io.WriteCloser + stdoutReader io.ReadCloser + stderrReader io.ReadCloser + + mu sync.Mutex + started bool +} + +func (c *invocation) Stdout() io.ReadCloser { + return c.stdoutReader +} +func (c *invocation) Stderr() io.ReadCloser { + return c.stderrReader +} +func (c *invocation) Run(ctx context.Context) (ExecutionResponse, error) { + resp := ExecutionResponse{} + + c.mu.Lock() + if c.started { + defer c.mu.Unlock() + return resp, ErrAlreadyExecuted + } + c.started = true + c.mu.Unlock() + + // Using a platform specific shell to "cheat", as the shell + // will handle certain failures for us, where golang exec is + // known to have troubles, e.g. command not found. We still + // use a fallback exit status in the unlikely event that the + // exit status cannot be determined. + var cmd *exec.Cmd + + // Use context.WithCancel for command execution timeout. + // context.WithTimeout will not kill child/grandchild processes + // (see issues tagged in https://github.com/sensu/sensu-go/issues/781). + // Rather, we will use a timer, CancelFunc and proc functions + // to perform full cleanup. + ctx, timeout := context.WithCancel(ctx) + defer timeout() + + // Taken from Sensu-Spawn (Sensu 1.x.x). + cmd = command(ctx, c.Command, c.Arguments) + + // Set the ENV for the command if it is set + if len(c.Env) > 0 { + cmd.Env = c.Env + } + + cmd.Stdout = c.stdoutWritter + cmd.Stderr = c.stderrWritter + defer func() { + c.stdoutWritter.Close() + c.stderrWritter.Close() + }() + + started := time.Now() + defer func() { + resp.Duration = time.Since(started) + }() + + timer := time.NewTimer(math.MaxInt64) + defer timer.Stop() + if c.Timeout > 0 { + setProcessGroup(cmd) + timer.Stop() + timer = time.NewTimer(c.Timeout) + } + if err := cmd.Start(); err != nil { + // Something unexpected happened when attempting to + // fork/exec, return immediately. + return resp, err + } + + waitCh := make(chan struct{}) + var err error + go func() { + err = cmd.Wait() + close(waitCh) + }() + + // Wait for the process to complete or the timer to trigger, whichever comes first. + var killErr error + select { + case <-waitCh: + if err != nil { + // The command most likely return a non-zero exit status. + if exitError, ok := err.(*exec.ExitError); ok { + // Best effort to determine the exit status, this + // should work on Linux, OSX, and Windows. + if status, ok := exitError.Sys().(syscall.WaitStatus); ok { + resp.Status = status.ExitStatus() + } else { + resp.Status = FallbackExitStatus + } + } else { + resp.Status = FallbackExitStatus + } + } else { + // Everything is A-OK. + resp.Status = OKExitStatus + } + + case <-timer.C: + var killErrOutput string + if killErr = killProcess(cmd); killErr != nil { + killErrOutput = fmt.Sprintf("Unable to TERM/KILL the process: #%d\n", cmd.Process.Pid) + } + timeout() + fmt.Fprintf(c.stderrWritter, "%s%s", TimeoutOutput, killErrOutput) + resp.Status = TimeoutExitStatus + } + + return resp, nil +} + +// ExecutionResponse provides the response information of an ExecutionRequest. +type ExecutionResponse struct { + // Command execution exit status. + Status int + + // Duration provides command execution time. + Duration time.Duration +} + +// cError const error type for sentinels +type cError string + +func (e cError) Error() string { + return string(e) +} diff --git a/pkg/receiver/jobreceiver/command/command_test.go b/pkg/receiver/jobreceiver/command/command_test.go new file mode 100644 index 0000000000..803c6fe5b8 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/command_test.go @@ -0,0 +1,152 @@ +package command + +import ( + "context" + "fmt" + "io" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestHelperProcess is a target that helps this suite emulate +// platform specific executables in a way that is platform agnostic. +func TestHelperProcess(t *testing.T) { + if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" { + return + } + var args []string + command := os.Args[3] + if len(os.Args) > 4 { + args = os.Args[4:] + } + + switch command { + case "echo": + fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " ")) + case "exit": + if i, err := strconv.ParseInt(args[0], 10, 32); err == nil { + os.Exit(int(i)) + } + panic("unexpected exit argument") + case "sleep": + if d, err := time.ParseDuration(args[0]); err == nil { + time.Sleep(d) + return + } + if i, err := strconv.ParseInt(args[0], 10, 64); err == nil { + time.Sleep(time.Second * time.Duration(i)) + return + } + } + +} + +//nolint:all +func TestExecute(t *testing.T) { + + ctx := context.Background() + // Basic command execution + echo := fakeCommand(t, "echo", "hello", "world") + outC := eventualOutput(echo) + resp, err := echo.Run(ctx) + require.NoError(t, err) + assert.Equal(t, 0, resp.Status) + require.NoError(t, err) + assert.Contains(t, <-outC, "hello world") + + // Command exits non-zero + exitCmd := fakeCommand(t, "exit", "1") + eventualOutput(exitCmd) + resp, err = exitCmd.Run(ctx) + require.NoError(t, err) + assert.Equal(t, 1, resp.Status) + exitCmd = fakeCommand(t, "exit", "33") + eventualOutput(exitCmd) + resp, err = exitCmd.Run(ctx) + require.NoError(t, err) + assert.Equal(t, 33, resp.Status) + + // Command canceled by context + timeoutCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100) + sleepCmd := fakeCommand(t, "sleep", "1m") + eventualOutput(sleepCmd) + done := make(chan struct{}) + go func() { + resp, err = sleepCmd.Run(timeoutCtx) + require.NoError(t, err) + close(done) + }() + select { + case <-time.After(time.Second): + t.Errorf("command context expired but was not killed") + case <-done: + // okay + } + cancel() + + // Command exceeds timeout + sleepCmd = fakeCommand(t, "sleep", "1m") + sleepCmd.Timeout = time.Millisecond * 100 + eventualOutput(sleepCmd) + done = make(chan struct{}) + go func() { + resp, err := sleepCmd.Run(ctx) + assert.NoError(t, err) + assert.Equal(t, TimeoutExitStatus, resp.Status) + close(done) + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("command timeout exceeded but was not killed") + case <-done: + // okay + } + + // Invocation cannot be spuriously re-invoked + echo = fakeCommand(t, "echo", "hello", "world") + outC = eventualOutput(echo) + echo.Run(ctx) + _, err = echo.Run(ctx) + assert.ErrorIs(t, ErrAlreadyExecuted, err) + assert.Contains(t, <-outC, "hello world") +} + +// fakeCommand takes a command and (optionally) command args and will execute +// the TestHelperProcess test within the package FakeCommand is called from. +func fakeCommand(t *testing.T, command string, args ...string) *invocation { + cargs := []string{"-test.run=TestHelperProcess", "--", command} + cargs = append(cargs, args...) + env := []string{"GO_WANT_HELPER_PROCESS=1"} + + execution := ExecutionRequest{ + Command: os.Args[0], + Arguments: cargs, + Env: env, + } + + c, err := NewInvocation(execution) + require.NoError(t, err) + cmd, ok := c.(*invocation) + require.True(t, ok) + return cmd +} + +func eventualOutput(i Invocation) <-chan string { + out := make(chan string, 1) + go func() { + var buf SyncBuffer + defer i.Stdout().Close() + defer i.Stderr().Close() + io.Copy(&buf, i.Stdout()) + io.Copy(&buf, i.Stderr()) + out <- buf.String() + close(out) + }() + return out +} diff --git a/pkg/receiver/jobreceiver/command/doc.go b/pkg/receiver/jobreceiver/command/doc.go new file mode 100644 index 0000000000..0631965fdd --- /dev/null +++ b/pkg/receiver/jobreceiver/command/doc.go @@ -0,0 +1,8 @@ +/* +Package command manages the execution of commands. + +It has been lightly adapted from the `github.com/sensu/sensu-go/command` package +that has been successfully used on the sensu-go's multi-platform agents, intended +to run checks in a Nagios plugin compatible way. +*/ +package command diff --git a/pkg/receiver/jobreceiver/command/proc_group_linux.go b/pkg/receiver/jobreceiver/command/proc_group_linux.go new file mode 100644 index 0000000000..459c3e83e0 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/proc_group_linux.go @@ -0,0 +1,11 @@ +package command + +import ( + "os/exec" + "syscall" +) + +// setProcessGroup sets the process group of the command processprocgroup +func setProcessGroup(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pdeathsig: syscall.SIGTERM} +} diff --git a/pkg/receiver/jobreceiver/command/proc_group_unix.go b/pkg/receiver/jobreceiver/command/proc_group_unix.go new file mode 100644 index 0000000000..54d52b7959 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/proc_group_unix.go @@ -0,0 +1,14 @@ +//go:build !linux && !windows +// +build !linux,!windows + +package command + +import ( + "os/exec" + "syscall" +) + +// setProcessGroup sets the process group of the command process +func setProcessGroup(cmd *exec.Cmd) { + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} +} diff --git a/pkg/receiver/jobreceiver/command/proc_group_windows.go b/pkg/receiver/jobreceiver/command/proc_group_windows.go new file mode 100644 index 0000000000..478ddc68b1 --- /dev/null +++ b/pkg/receiver/jobreceiver/command/proc_group_windows.go @@ -0,0 +1,11 @@ +package command + +import ( + "os/exec" + "syscall" +) + +// setProcessGroup sets the process group of the command process +func setProcessGroup(cmd *exec.Cmd) { + cmd.SysProcAttr.CreationFlags = syscall.CREATE_NEW_PROCESS_GROUP +} diff --git a/pkg/receiver/jobreceiver/command/proc_unix.go b/pkg/receiver/jobreceiver/command/proc_unix.go new file mode 100644 index 0000000000..aaa4bac7fb --- /dev/null +++ b/pkg/receiver/jobreceiver/command/proc_unix.go @@ -0,0 +1,20 @@ +//go:build !windows +// +build !windows + +package command + +import ( + "context" + "os/exec" + "syscall" +) + +// command returns a command to execute an executable +func command(ctx context.Context, command string, args []string) *exec.Cmd { + return exec.CommandContext(ctx, command, args...) +} + +// killProcess kills the command process and any child processes +func killProcess(cmd *exec.Cmd) error { + return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) +} diff --git a/pkg/receiver/jobreceiver/command/proc_windows.go b/pkg/receiver/jobreceiver/command/proc_windows.go new file mode 100644 index 0000000000..de7db3b87a --- /dev/null +++ b/pkg/receiver/jobreceiver/command/proc_windows.go @@ -0,0 +1,60 @@ +//go:build windows +// +build windows + +package command + +import ( + "context" + "fmt" + "os" + "os/exec" + "strings" + "syscall" +) + +// command returns a command to execute an executable +func command(ctx context.Context, command string, args []string) *exec.Cmd { + cmd := exec.CommandContext(ctx, "cmd") + cl := append([]string{command}, args...) + cmd.SysProcAttr = &syscall.SysProcAttr{ + // Manually set the command line arguments so they are not escaped + // https://github.com/golang/go/commit/f18a4e9609aac3aa83d40920c12b9b45f9376aea + // http://www.josephspurrier.com/prevent-escaping-exec-command-arguments-in-go/ + CmdLine: strings.Join(cl, " "), + } + return cmd +} + +// killProcess kills the command process and any child processes +func killProcess(cmd *exec.Cmd) error { + process := cmd.Process + if process == nil { + return nil + } + + args := []string{fmt.Sprintf("/T /F /PID %d", process.Pid)} + err := command(context.Background(), "taskkill", args).Run() + if err == nil { + return nil + } + + err = forceKill(process) + if err == nil { + return nil + } + err = process.Signal(os.Kill) + + return fmt.Errorf("could not kill process") +} + +func forceKill(process *os.Process) error { + handle, err := syscall.OpenProcess(syscall.PROCESS_TERMINATE, true, uint32(process.Pid)) + if err != nil { + return err + } + + err = syscall.TerminateProcess(handle, 0) + _ = syscall.CloseHandle(handle) + + return err +} diff --git a/pkg/receiver/jobreceiver/output/consumer/consumer.go b/pkg/receiver/jobreceiver/output/consumer/consumer.go index 79c72bba75..a47003d249 100644 --- a/pkg/receiver/jobreceiver/output/consumer/consumer.go +++ b/pkg/receiver/jobreceiver/output/consumer/consumer.go @@ -13,7 +13,7 @@ import ( // Interface consumes command output and emits telemetry data type Interface interface { - Consume(stdin, stderr io.Reader) CloseFunc + Consume(ctx context.Context, stdin, stderr io.ReadCloser) CloseFunc } // CloseFunc @@ -45,10 +45,12 @@ type DemoConsumer struct { } // Consume reads stdout line by line and produces entries -func (p *DemoConsumer) Consume(stdout, _ io.Reader) CloseFunc { - ctx, cancel := context.WithCancel(context.Background()) +func (p *DemoConsumer) Consume(ctx context.Context, stdout, stderr io.ReadCloser) CloseFunc { + ctx, cancel := context.WithCancel(ctx) + stderr.Close() go func() { scanner := bufio.NewScanner(stdout) + defer stdout.Close() for { select { case <-ctx.Done(): @@ -56,6 +58,9 @@ func (p *DemoConsumer) Consume(stdout, _ io.Reader) CloseFunc { default: } if !scanner.Scan() { + if scanner.Err() != nil { + panic(scanner.Err()) + } return } ent, err := p.NewEntry(scanner.Text()) diff --git a/pkg/receiver/jobreceiver/runner.go b/pkg/receiver/jobreceiver/runner.go index 9e9472931d..0257e69fd6 100644 --- a/pkg/receiver/jobreceiver/runner.go +++ b/pkg/receiver/jobreceiver/runner.go @@ -1,10 +1,10 @@ package jobreceiver import ( - "bytes" - "fmt" + "context" "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/builder" + "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/command" "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "go.uber.org/zap" @@ -13,18 +13,44 @@ import ( // Build returns the job runner, the process responsible for scheduling and // running commands and piping their output to the output consumer. func (c Config) Build(logger *zap.SugaredLogger, out consumer.Interface) (builder.JobRunner, error) { - return &stubRunner{Consumer: out}, nil + return &stubRunner{ + Exec: c.Exec, + Consumer: out, + }, nil } // stubRunner is a stub implementation. type stubRunner struct { + Exec ExecutionConfig Consumer consumer.Interface } +// Start stub impl. runs command once at startup then idles indefinitely. func (r *stubRunner) Start(operator.Persister) error { - var buf bytes.Buffer - fmt.Fprint(&buf, "hello world. This is a placeholder message.") - r.Consumer.Consume(&buf, &bytes.Buffer{}) + go func() { + ctx := context.Background() + cmd, err := command.NewInvocation(command.ExecutionRequest{ + Command: r.Exec.Command, + Arguments: r.Exec.Arguments, + Timeout: r.Exec.Timeout, + }) + + if err != nil { + panic(err) + } + cb := r.Consumer.Consume(ctx, cmd.Stdout(), cmd.Stderr()) + + resp, err := cmd.Run(ctx) + if err != nil { + panic(err) + } + cb(consumer.ExecutionSummary{ + Command: r.Exec.Command, + ExitCode: resp.Status, + RunDuration: resp.Duration, + }) + + }() return nil } diff --git a/pkg/receiver/jobreceiver/testdata/usage.yaml b/pkg/receiver/jobreceiver/testdata/usage.yaml index 9963505b18..2f5e904a74 100644 --- a/pkg/receiver/jobreceiver/testdata/usage.yaml +++ b/pkg/receiver/jobreceiver/testdata/usage.yaml @@ -34,9 +34,11 @@ receivers: schedule: interval: 1h exec: - command: echo + command: /bin/sh + timeout: 1m arguments: - - "hello world" + - '-c' + - 'for i in {0..120}; do date -Iseconds --date "90 seconds ago" && sleep 1; done;' output: type: event event: @@ -49,22 +51,16 @@ receivers: resource: bingo: bango operators: - - type: regex_parser + - type: time_parser parse_from: body - regex: '^(?P\w+).*$' - - type: severity_parser - parse_from: attributes.first - mapping: - error: total - warn: - - nonsense - - '.' - - hello + layout_type: gotime + layout: '2006-01-02T15:04:05-07:00' exporters: logging: verbosity: detailed + sampling_initial: 32 processors: memory_limiter: