Skip to content

Commit

Permalink
[pkg/receiver/jobreceiver] Add command package
Browse files Browse the repository at this point in the history
Adds a new jobreceiver/command package that encapsulates the handling of
executing commands. Mostly borrowed from sensu-go's robust commands
package used by it's multi-platform agents.

Signed-off-by: Christian Kruse <[email protected]>
  • Loading branch information
c-kruse committed Sep 6, 2023
1 parent 65aad8c commit 847e57a
Show file tree
Hide file tree
Showing 12 changed files with 556 additions and 21 deletions.
25 changes: 25 additions & 0 deletions pkg/receiver/jobreceiver/command/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package command

import (
"bytes"
"sync"
)

// SyncBuffer can be used to buffer both output streams to
// in a monitoring plugin spec compliant way.
type SyncBuffer struct {
buf bytes.Buffer
mu sync.Mutex
}

func (s *SyncBuffer) Write(p []byte) (n int, err error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.buf.Write(p)
}

func (s *SyncBuffer) String() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.buf.String()
}
207 changes: 207 additions & 0 deletions pkg/receiver/jobreceiver/command/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package command

import (
"context"
"fmt"
"io"
"math"
"os"
"os/exec"
"sync"
"syscall"
"time"
)

const (
// TimeoutOutput specifies the command execution output in the
// event of an execution timeout.
TimeoutOutput string = "Execution timed out\n"

// OKExitStatus specifies the command execution exit status
// that indicates a success, A-OK.
OKExitStatus int = 0

// TimeoutExitStatus specifies the command execution exit
// status in the event of an execution timeout.
TimeoutExitStatus int = 2

// FallbackExitStatus specifies the command execution exit
// status used when golang is unable to determine the exit
// status.
FallbackExitStatus int = 3

// ErrAlreadyExecuted don't do that.
ErrAlreadyExecuted = cError("invocation has already been started once")
)

// ExecutionRequest provides information about a system command execution,
// somewhat of an abstraction intended to be used for monitoring jobs
type ExecutionRequest struct {
// Command is the command to be executed.
Command string

// Arguments to execute the command with.
Arguments []string

// Env ...
Env []string

// Execution timeout
Timeout time.Duration
}

type Invocation interface {
// Stdout stream. Must by closed by the caller.
Stdout() io.ReadCloser
// Stderr stream. Must by closed by the caller.
Stderr() io.ReadCloser
// Run the command. Only valid once
Run(context.Context) (ExecutionResponse, error)
}

// NewInvocation based on an ExecutionRequest
func NewInvocation(request ExecutionRequest) (Invocation, error) {
c := &invocation{ExecutionRequest: request}
var err error
c.stdoutReader, c.stdoutWritter, err = os.Pipe()
if err != nil {
return c, err
}
c.stderrReader, c.stderrWritter, err = os.Pipe()
return c, err
}

type invocation struct {
ExecutionRequest

stdoutWritter io.WriteCloser
stderrWritter io.WriteCloser
stdoutReader io.ReadCloser
stderrReader io.ReadCloser

mu sync.Mutex
started bool
}

func (c *invocation) Stdout() io.ReadCloser {
return c.stdoutReader
}
func (c *invocation) Stderr() io.ReadCloser {
return c.stderrReader
}
func (c *invocation) Run(ctx context.Context) (ExecutionResponse, error) {
resp := ExecutionResponse{}

c.mu.Lock()
if c.started {
defer c.mu.Unlock()
return resp, ErrAlreadyExecuted
}
c.started = true
c.mu.Unlock()

// Using a platform specific shell to "cheat", as the shell
// will handle certain failures for us, where golang exec is
// known to have troubles, e.g. command not found. We still
// use a fallback exit status in the unlikely event that the
// exit status cannot be determined.
var cmd *exec.Cmd

// Use context.WithCancel for command execution timeout.
// context.WithTimeout will not kill child/grandchild processes
// (see issues tagged in https://github.com/sensu/sensu-go/issues/781).
// Rather, we will use a timer, CancelFunc and proc functions
// to perform full cleanup.
ctx, timeout := context.WithCancel(ctx)
defer timeout()

// Taken from Sensu-Spawn (Sensu 1.x.x).
cmd = command(ctx, c.Command, c.Arguments)

// Set the ENV for the command if it is set
if len(c.Env) > 0 {
cmd.Env = c.Env
}

cmd.Stdout = c.stdoutWritter
cmd.Stderr = c.stderrWritter
defer func() {
c.stdoutWritter.Close()
c.stderrWritter.Close()
}()

started := time.Now()
defer func() {
resp.Duration = time.Since(started)
}()

timer := time.NewTimer(math.MaxInt64)
defer timer.Stop()
if c.Timeout > 0 {
setProcessGroup(cmd)
timer.Stop()
timer = time.NewTimer(c.Timeout)
}
if err := cmd.Start(); err != nil {
// Something unexpected happened when attempting to
// fork/exec, return immediately.
return resp, err
}

waitCh := make(chan struct{})
var err error
go func() {
err = cmd.Wait()
close(waitCh)
}()

// Wait for the process to complete or the timer to trigger, whichever comes first.
var killErr error
select {
case <-waitCh:
if err != nil {
// The command most likely return a non-zero exit status.
if exitError, ok := err.(*exec.ExitError); ok {
// Best effort to determine the exit status, this
// should work on Linux, OSX, and Windows.
if status, ok := exitError.Sys().(syscall.WaitStatus); ok {
resp.Status = status.ExitStatus()
} else {
resp.Status = FallbackExitStatus
}
} else {
resp.Status = FallbackExitStatus
}
} else {
// Everything is A-OK.
resp.Status = OKExitStatus
}

case <-timer.C:
var killErrOutput string
if killErr = killProcess(cmd); killErr != nil {
killErrOutput = fmt.Sprintf("Unable to TERM/KILL the process: #%d\n", cmd.Process.Pid)
}
timeout()
fmt.Fprintf(c.stderrWritter, "%s%s", TimeoutOutput, killErrOutput)
resp.Status = TimeoutExitStatus
}

return resp, nil
}

// ExecutionResponse provides the response information of an ExecutionRequest.
type ExecutionResponse struct {
// Command execution exit status.
Status int

// Duration provides command execution time.
Duration time.Duration
}

// cError const error type for sentinels
type cError string

func (e cError) Error() string {
return string(e)
}
152 changes: 152 additions & 0 deletions pkg/receiver/jobreceiver/command/command_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package command

import (
"context"
"fmt"
"io"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestHelperProcess is a target that helps this suite emulate
// platform specific executables in a way that is platform agnostic.
func TestHelperProcess(t *testing.T) {
if os.Getenv("GO_WANT_HELPER_PROCESS") != "1" {
return
}
var args []string
command := os.Args[3]
if len(os.Args) > 4 {
args = os.Args[4:]
}

switch command {
case "echo":
fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " "))
case "exit":
if i, err := strconv.ParseInt(args[0], 10, 32); err == nil {
os.Exit(int(i))
}
panic("unexpected exit argument")
case "sleep":
if d, err := time.ParseDuration(args[0]); err == nil {
time.Sleep(d)
return
}
if i, err := strconv.ParseInt(args[0], 10, 64); err == nil {
time.Sleep(time.Second * time.Duration(i))
return
}
}

}

//nolint:all
func TestExecute(t *testing.T) {

ctx := context.Background()
// Basic command execution
echo := fakeCommand(t, "echo", "hello", "world")
outC := eventualOutput(echo)
resp, err := echo.Run(ctx)
require.NoError(t, err)
assert.Equal(t, 0, resp.Status)
require.NoError(t, err)
assert.Contains(t, <-outC, "hello world")

// Command exits non-zero
exitCmd := fakeCommand(t, "exit", "1")
eventualOutput(exitCmd)
resp, err = exitCmd.Run(ctx)
require.NoError(t, err)
assert.Equal(t, 1, resp.Status)
exitCmd = fakeCommand(t, "exit", "33")
eventualOutput(exitCmd)
resp, err = exitCmd.Run(ctx)
require.NoError(t, err)
assert.Equal(t, 33, resp.Status)

// Command canceled by context
timeoutCtx, cancel := context.WithTimeout(ctx, time.Millisecond*100)
sleepCmd := fakeCommand(t, "sleep", "1m")
eventualOutput(sleepCmd)
done := make(chan struct{})
go func() {
resp, err = sleepCmd.Run(timeoutCtx)
require.NoError(t, err)
close(done)
}()
select {
case <-time.After(time.Second):
t.Errorf("command context expired but was not killed")
case <-done:
// okay
}
cancel()

// Command exceeds timeout
sleepCmd = fakeCommand(t, "sleep", "1m")
sleepCmd.Timeout = time.Millisecond * 100
eventualOutput(sleepCmd)
done = make(chan struct{})
go func() {
resp, err := sleepCmd.Run(ctx)
assert.NoError(t, err)
assert.Equal(t, TimeoutExitStatus, resp.Status)
close(done)
}()
select {
case <-time.After(5 * time.Second):
t.Errorf("command timeout exceeded but was not killed")
case <-done:
// okay
}

// Invocation cannot be spuriously re-invoked
echo = fakeCommand(t, "echo", "hello", "world")
outC = eventualOutput(echo)
echo.Run(ctx)
_, err = echo.Run(ctx)
assert.ErrorIs(t, ErrAlreadyExecuted, err)
assert.Contains(t, <-outC, "hello world")
}

// fakeCommand takes a command and (optionally) command args and will execute
// the TestHelperProcess test within the package FakeCommand is called from.
func fakeCommand(t *testing.T, command string, args ...string) *invocation {
cargs := []string{"-test.run=TestHelperProcess", "--", command}
cargs = append(cargs, args...)
env := []string{"GO_WANT_HELPER_PROCESS=1"}

execution := ExecutionRequest{
Command: os.Args[0],
Arguments: cargs,
Env: env,
}

c, err := NewInvocation(execution)
require.NoError(t, err)
cmd, ok := c.(*invocation)
require.True(t, ok)
return cmd
}

func eventualOutput(i Invocation) <-chan string {
out := make(chan string, 1)
go func() {
var buf SyncBuffer
defer i.Stdout().Close()
defer i.Stderr().Close()
io.Copy(&buf, i.Stdout())
io.Copy(&buf, i.Stderr())
out <- buf.String()
close(out)
}()
return out
}
8 changes: 8 additions & 0 deletions pkg/receiver/jobreceiver/command/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/*
Package command manages the execution of commands.
It has been lightly adapted from the `github.com/sensu/sensu-go/command` package
that has been successfully used on the sensu-go's multi-platform agents, intended
to run checks in a Nagios plugin compatible way.
*/
package command
Loading

0 comments on commit 847e57a

Please sign in to comment.