Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jobreceiver): Implement the command package #1240

Merged
merged 11 commits into from
Sep 22, 2023
126 changes: 126 additions & 0 deletions pkg/receiver/jobreceiver/command/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package command

import (
"context"
"io"
"os/exec"
"syscall"
"time"
)

const (
// OKExitStatus specifies the command execution exit status
// that indicates a success, A-OK.
OKExitStatus int = 0

// FallbackExitStatus specifies the command execution exit
// status used when golang is unable to determine the exit
// status.
FallbackExitStatus int = 3
)

// ExecutionRequest describes an external command to be executed.
type ExecutionRequest struct {
// Command is the command to be executed. This is the only field that must
// be set non-zero. Behaves like os/exec.Cmd's Path field.
Command string

// Arguments to execute the command with.
Arguments []string

// Env variables to include
Env []string

// Timeout when set non-zero functions as a timer for starting and running
// a command on Execution.Run
Timeout time.Duration
}

// NewExecution based on an ExecutionRequest
func NewExecution(ctx context.Context, request ExecutionRequest) *Execution {
ctx, cancel := context.WithCancel(ctx)
cmd := exec.CommandContext(ctx, request.Command, request.Arguments...)
if len(request.Env) > 0 {
cmd.Env = request.Env
}

setOptions(cmd)
return &Execution{
cmd: cmd,
ctx: ctx,
cancel: cancel,
timeout: request.Timeout,
}
}

type Execution struct {
cmd *exec.Cmd
ctx context.Context
cancel func()
timeout time.Duration
}

// Stdout and Stderr return Readers for the underlying execution's output
// streams. Run must be called subsequently or file descriptors may be leaked.
func (c *Execution) Stdout() (io.Reader, error) {
return c.cmd.StdoutPipe()
}
func (c *Execution) Stderr() (io.Reader, error) {
return c.cmd.StderrPipe()
}

// Run the command. May only be invoked once.
func (c *Execution) Run() (resp ExecutionResponse, err error) {
defer c.cancel()

started := time.Now()
defer func() {
resp.Duration = time.Since(started)
}()

if c.timeout > 0 {
time.AfterFunc(c.timeout, c.cancel)
}
if err := c.cmd.Start(); err != nil {
// Something unexpected happened when attempting to
// fork/exec, return immediately.
return resp, err
}

// Wait for the process to complete then attempt to determine the result
err = c.cmd.Wait()
if err != nil {
// The command most likely return a non-zero exit status.
if exitError, ok := err.(*exec.ExitError); ok {
// Best effort to determine the exit status, this
// should work on Linux, OSX, and Windows.
if status, ok := exitError.Sys().(syscall.WaitStatus); ok {
resp.Status = status.ExitStatus()
} else {
resp.Status = FallbackExitStatus
resp.Error = exitError
}
} else {
// Probably an I/O error
resp.Status = FallbackExitStatus
resp.Error = err
}
} else {
// Everything is A-OK.
resp.Status = OKExitStatus
}

return resp, nil
}

// ExecutionResponse provides the response information of an ExecutionRequest.
type ExecutionResponse struct {
// Command execution exit status.
Status int

// Duration provides command execution time.
Duration time.Duration

// Error is passed when the outcome of the execution is uncertain
Error error
}
169 changes: 169 additions & 0 deletions pkg/receiver/jobreceiver/command/command_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package command

import (
"bytes"
"context"
"flag"
"fmt"
"io"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestMain lets the test binary emulate other processes
func TestMain(m *testing.M) {
flag.Parse()

pid := os.Getpid()
if os.Getenv("GO_EXEC_TEST_PID") == "" {
os.Setenv("GO_EXEC_TEST_PID", strconv.Itoa(pid))
os.Exit(m.Run())
}

args := flag.Args()
if len(args) == 0 {
fmt.Fprintf(os.Stderr, "No command\n")
os.Exit(2)
}

command, args := args[0], args[1:]
switch command {
case "echo":
fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " "))
case "exit":
if i, err := strconv.ParseInt(args[0], 10, 32); err == nil {
os.Exit(int(i))
}
panic("unexpected exit argument")
case "sleep":
if d, err := time.ParseDuration(args[0]); err == nil {
time.Sleep(d)
return
}
if i, err := strconv.ParseInt(args[0], 10, 64); err == nil {
time.Sleep(time.Second * time.Duration(i))
return
}
case "fork":
childCommand := NewExecution(context.Background(), ExecutionRequest{
Command: os.Args[0],
Arguments: args,
})
_, err := childCommand.Run()
if err != nil {
fmt.Fprintf(os.Stderr, "fork error: %v", err)
os.Exit(3)
}
}
}

func TestExecute(t *testing.T) {
ctx := context.Background()
// Basic command execution
t.Run("basic", func(t *testing.T) {
echo := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "echo", Arguments: []string{"hello", "world"}}))
outC := eventualOutput(t, echo)
resp, err := echo.Run()
require.NoError(t, err)
assert.Equal(t, 0, resp.Status)
require.NoError(t, err)
assert.Contains(t, <-outC, "hello world")
})

// Command exits non-zero
t.Run("exit code 1", func(t *testing.T) {
exitCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "exit", Arguments: []string{"1"}}))
resp, err := exitCmd.Run()
require.NoError(t, err)
assert.Equal(t, 1, resp.Status)
exitCmd = NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "exit", Arguments: []string{"33"}}))
resp, err = exitCmd.Run()
require.NoError(t, err)
assert.Equal(t, 33, resp.Status)
})

// Command exceeds timeout
t.Run("exceeds timeout", func(t *testing.T) {
timeout := time.Millisecond * 100
sleepCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "sleep", Arguments: []string{"1m"}, Timeout: timeout}))
done := make(chan struct{})
go func() {
resp, err := sleepCmd.Run()
assert.NoError(t, err)
assert.NotEqual(t, OKExitStatus, resp.Status)
assert.LessOrEqual(t, timeout, resp.Duration)
close(done)
}()
select {
case <-time.After(5 * time.Second):
t.Errorf("command timeout exceeded but was not killed")
case <-done:
// okay
}
})
// Command exceeds timeout with child process
t.Run("exceeds timeout with child", func(t *testing.T) {
timeout := time.Millisecond * 100
sleepCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "fork", Arguments: []string{"sleep", "1m"}, Timeout: timeout}))
done := make(chan struct{})
go func() {
resp, err := sleepCmd.Run()
assert.NoError(t, err)
assert.NotEqual(t, OKExitStatus, resp.Status)
assert.LessOrEqual(t, timeout, resp.Duration)
close(done)
}()
select {
case <-time.After(5 * time.Second):
t.Fatal("command timeout exceeded but was not killed")
case <-done:
// okay
}
})

// Invocation cannot be spuriously re-invoked
t.Run("cannot be ran twice", func(t *testing.T) {
echo := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "echo", Arguments: []string{"hello", "world"}}))
outC := eventualOutput(t, echo)
_, err := echo.Run()
require.NoError(t, err)
_, err = echo.Run()
assert.Error(t, err)
assert.Contains(t, <-outC, "hello world")
})
}

// withTestHelper takes an ExecutionRequest and adjusts it to run with the
// test binary. TestMain will handle emulating the command.
func withTestHelper(t *testing.T, request ExecutionRequest) ExecutionRequest {
t.Helper()

request.Arguments = append([]string{request.Command}, request.Arguments...)
request.Command = os.Args[0]
return request
}

func eventualOutput(t *testing.T, i *Execution) <-chan string {
t.Helper()
out := make(chan string, 1)
stdout, err := i.Stdout()
require.NoError(t, err)
stderr, err := i.Stderr()
require.NoError(t, err)
go func() {
var buf bytes.Buffer
_, err := io.Copy(&buf, stdout)
require.NoError(t, err)
_, err = io.Copy(&buf, stderr)
require.NoError(t, err)
out <- buf.String()
close(out)
}()
return out
}
15 changes: 15 additions & 0 deletions pkg/receiver/jobreceiver/command/exec_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package command
swiatekm marked this conversation as resolved.
Show resolved Hide resolved

import (
"os/exec"
"syscall"
)

// setOptions sets the process group of the command processprocgroup
func setOptions(cmd *exec.Cmd) {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true, Pdeathsig: syscall.SIGTERM}
cmd.Cancel = func() error {
// Kill process group instead
return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
}
}
18 changes: 18 additions & 0 deletions pkg/receiver/jobreceiver/command/exec_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
//go:build !linux && !windows
// +build !linux,!windows

package command

import (
"os/exec"
"syscall"
)

// setOptions sets the process group of the command process
func setOptions(cmd *exec.Cmd) {
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
cmd.Cancel = func() error {
// Kill process group instead
return syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
}
}
24 changes: 24 additions & 0 deletions pkg/receiver/jobreceiver/command/exec_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package command

import (
"fmt"
"os/exec"
)

// setOptions sets the process group of the command process
func setOptions(cmd *exec.Cmd) {
// Cancel functiona adapted from sensu-go's command package
// TODO(ck) may be worth looking into the windows Job Object api
cmd.Cancel = func() error {
// Try with taskkill first
taskkill := exec.Command(
"taskkill",
"/T", "/F", "/PID", fmt.Sprint(cmd.Process.Pid),
)
if err := taskkill.Run(); err == nil {
return nil
}
// Fall back to the default behavior
return cmd.Process.Kill()
}
}
9 changes: 6 additions & 3 deletions pkg/receiver/jobreceiver/output/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// Interface consumes command output and emits telemetry data
type Interface interface {
Consume(stdin, stderr io.Reader) CloseFunc
Consume(ctx context.Context, stdin, stderr io.Reader) CloseFunc
}

// CloseFunc
Expand Down Expand Up @@ -45,8 +45,8 @@ type DemoConsumer struct {
}

// Consume reads stdout line by line and produces entries
func (p *DemoConsumer) Consume(stdout, _ io.Reader) CloseFunc {
ctx, cancel := context.WithCancel(context.Background())
func (p *DemoConsumer) Consume(ctx context.Context, stdout, stderr io.Reader) CloseFunc {
ctx, cancel := context.WithCancel(ctx)
go func() {
scanner := bufio.NewScanner(stdout)
for {
Expand All @@ -56,6 +56,9 @@ func (p *DemoConsumer) Consume(stdout, _ io.Reader) CloseFunc {
default:
}
if !scanner.Scan() {
if scanner.Err() != nil {
panic(scanner.Err())
}
return
}
ent, err := p.NewEntry(scanner.Text())
Expand Down
Loading