Skip to content

Commit

Permalink
simplify command package
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Kruse <[email protected]>
  • Loading branch information
c-kruse committed Sep 7, 2023
1 parent 847e57a commit c407b49
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 359 deletions.
25 changes: 0 additions & 25 deletions pkg/receiver/jobreceiver/command/buffer.go

This file was deleted.

163 changes: 50 additions & 113 deletions pkg/receiver/jobreceiver/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,18 @@ package command

import (
"context"
"fmt"
"io"
"math"
"os"
"os/exec"
"sync"
"syscall"
"time"
)

const (
// TimeoutOutput specifies the command execution output in the
// event of an execution timeout.
TimeoutOutput string = "Execution timed out\n"

// OKExitStatus specifies the command execution exit status
// that indicates a success, A-OK.
OKExitStatus int = 0

// TimeoutExitStatus specifies the command execution exit
// status in the event of an execution timeout.
TimeoutExitStatus int = 2

// FallbackExitStatus specifies the command execution exit
// status used when golang is unable to determine the exit
// status.
Expand All @@ -34,64 +23,57 @@ const (
ErrAlreadyExecuted = cError("invocation has already been started once")
)

// ExecutionRequest provides information about a system command execution,
// somewhat of an abstraction intended to be used for monitoring jobs
// ExecutionRequest
type ExecutionRequest struct {
// Command is the command to be executed.
Command string

// Arguments to execute the command with.
Arguments []string

// Env ...
// Env variables to include
Env []string

// Execution timeout
Timeout time.Duration
}

type Invocation interface {
// Stdout stream. Must by closed by the caller.
Stdout() io.ReadCloser
// Stderr stream. Must by closed by the caller.
Stderr() io.ReadCloser
// Run the command. Only valid once
Run(context.Context) (ExecutionResponse, error)
}

// NewInvocation based on an ExecutionRequest
func NewInvocation(request ExecutionRequest) (Invocation, error) {
c := &invocation{ExecutionRequest: request}
var err error
c.stdoutReader, c.stdoutWritter, err = os.Pipe()
if err != nil {
return c, err
// NewExecution based on an ExecutionRequest
func NewExecution(ctx context.Context, request ExecutionRequest) *Execution {
ctx, cancel := context.WithCancel(ctx)
cmd := exec.CommandContext(ctx, request.Command, request.Arguments...)
if len(request.Env) > 0 {
cmd.Env = request.Env
}
return &Execution{
cmd: cmd,
ctx: ctx,
cancel: cancel,
timeout: request.Timeout,
}
c.stderrReader, c.stderrWritter, err = os.Pipe()
return c, err
}

type invocation struct {
ExecutionRequest

stdoutWritter io.WriteCloser
stderrWritter io.WriteCloser
stdoutReader io.ReadCloser
stderrReader io.ReadCloser
type Execution struct {
cmd *exec.Cmd
ctx context.Context
cancel func()
timeout time.Duration

mu sync.Mutex
started bool
}

func (c *invocation) Stdout() io.ReadCloser {
return c.stdoutReader
// Stdout and Stderr return Readers for the underlying execution's output
// streams. Run must be called subsequently or file descriptors may be leaked.
func (c *Execution) Stdout() (io.Reader, error) {
return c.cmd.StdoutPipe()
}
func (c *invocation) Stderr() io.ReadCloser {
return c.stderrReader
func (c *Execution) Stderr() (io.Reader, error) {
return c.cmd.StderrPipe()
}
func (c *invocation) Run(ctx context.Context) (ExecutionResponse, error) {
resp := ExecutionResponse{}

// Run the command. May only be invoked once.
func (c *Execution) Run() (resp ExecutionResponse, err error) {
c.mu.Lock()
if c.started {
defer c.mu.Unlock()
Expand All @@ -100,91 +82,43 @@ func (c *invocation) Run(ctx context.Context) (ExecutionResponse, error) {
c.started = true
c.mu.Unlock()

// Using a platform specific shell to "cheat", as the shell
// will handle certain failures for us, where golang exec is
// known to have troubles, e.g. command not found. We still
// use a fallback exit status in the unlikely event that the
// exit status cannot be determined.
var cmd *exec.Cmd

// Use context.WithCancel for command execution timeout.
// context.WithTimeout will not kill child/grandchild processes
// (see issues tagged in https://github.com/sensu/sensu-go/issues/781).
// Rather, we will use a timer, CancelFunc and proc functions
// to perform full cleanup.
ctx, timeout := context.WithCancel(ctx)
defer timeout()

// Taken from Sensu-Spawn (Sensu 1.x.x).
cmd = command(ctx, c.Command, c.Arguments)

// Set the ENV for the command if it is set
if len(c.Env) > 0 {
cmd.Env = c.Env
}

cmd.Stdout = c.stdoutWritter
cmd.Stderr = c.stderrWritter
defer func() {
c.stdoutWritter.Close()
c.stderrWritter.Close()
}()
defer c.cancel()

started := time.Now()
defer func() {
resp.Duration = time.Since(started)
}()

timer := time.NewTimer(math.MaxInt64)
defer timer.Stop()
if c.Timeout > 0 {
setProcessGroup(cmd)
timer.Stop()
timer = time.NewTimer(c.Timeout)
if c.timeout > 0 {
time.AfterFunc(c.timeout, c.cancel)
}
if err := cmd.Start(); err != nil {
if err := c.cmd.Start(); err != nil {
// Something unexpected happened when attempting to
// fork/exec, return immediately.
return resp, err
}

waitCh := make(chan struct{})
var err error
go func() {
err = cmd.Wait()
close(waitCh)
}()

// Wait for the process to complete or the timer to trigger, whichever comes first.
var killErr error
select {
case <-waitCh:
if err != nil {
// The command most likely return a non-zero exit status.
if exitError, ok := err.(*exec.ExitError); ok {
// Best effort to determine the exit status, this
// should work on Linux, OSX, and Windows.
if status, ok := exitError.Sys().(syscall.WaitStatus); ok {
resp.Status = status.ExitStatus()
} else {
resp.Status = FallbackExitStatus
}
// Wait for the process to complete then attempt to determine the result
err = c.cmd.Wait()
if err != nil {
// The command most likely return a non-zero exit status.
if exitError, ok := err.(*exec.ExitError); ok {
// Best effort to determine the exit status, this
// should work on Linux, OSX, and Windows.
if status, ok := exitError.Sys().(syscall.WaitStatus); ok {
resp.Status = status.ExitStatus()
} else {
resp.Status = FallbackExitStatus
resp.Error = exitError
}
} else {
// Everything is A-OK.
resp.Status = OKExitStatus
}

case <-timer.C:
var killErrOutput string
if killErr = killProcess(cmd); killErr != nil {
killErrOutput = fmt.Sprintf("Unable to TERM/KILL the process: #%d\n", cmd.Process.Pid)
// Probably an I/O error
resp.Status = FallbackExitStatus
resp.Error = err
}
timeout()
fmt.Fprintf(c.stderrWritter, "%s%s", TimeoutOutput, killErrOutput)
resp.Status = TimeoutExitStatus
} else {
// Everything is A-OK.
resp.Status = OKExitStatus
}

return resp, nil
Expand All @@ -197,6 +131,9 @@ type ExecutionResponse struct {

// Duration provides command execution time.
Duration time.Duration

// Error is passed when the outcome of the execution is uncertain
Error error
}

// cError const error type for sentinels
Expand Down
Loading

0 comments on commit c407b49

Please sign in to comment.