Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jobreceiver): Implement the command package #1240

Merged
merged 11 commits into from
Sep 22, 2023
144 changes: 144 additions & 0 deletions pkg/receiver/jobreceiver/command/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package command

import (
"context"
"io"
"os/exec"
"sync"
"syscall"
"time"
)

const (
// OKExitStatus specifies the command execution exit status
// that indicates a success, A-OK.
OKExitStatus int = 0

// FallbackExitStatus specifies the command execution exit
// status used when golang is unable to determine the exit
// status.
FallbackExitStatus int = 3

// ErrAlreadyExecuted don't do that.
c-kruse marked this conversation as resolved.
Show resolved Hide resolved
ErrAlreadyExecuted = cError("invocation has already been started once")
)

// ExecutionRequest
c-kruse marked this conversation as resolved.
Show resolved Hide resolved
type ExecutionRequest struct {
// Command is the command to be executed.
Command string

// Arguments to execute the command with.
Arguments []string

// Env variables to include
Env []string

// Execution timeout
Timeout time.Duration
}

// NewExecution based on an ExecutionRequest
func NewExecution(ctx context.Context, request ExecutionRequest) *Execution {
ctx, cancel := context.WithCancel(ctx)
cmd := exec.CommandContext(ctx, request.Command, request.Arguments...)
if len(request.Env) > 0 {
cmd.Env = request.Env
}
return &Execution{
cmd: cmd,
ctx: ctx,
cancel: cancel,
timeout: request.Timeout,
}
}

type Execution struct {
cmd *exec.Cmd
ctx context.Context
cancel func()
timeout time.Duration

mu sync.Mutex
started bool
}

// Stdout and Stderr return Readers for the underlying execution's output
// streams. Run must be called subsequently or file descriptors may be leaked.
func (c *Execution) Stdout() (io.Reader, error) {
return c.cmd.StdoutPipe()
}
func (c *Execution) Stderr() (io.Reader, error) {
return c.cmd.StderrPipe()
}

// Run the command. May only be invoked once.
func (c *Execution) Run() (resp ExecutionResponse, err error) {
c.mu.Lock()
if c.started {
defer c.mu.Unlock()
return resp, ErrAlreadyExecuted
}
c.started = true
c.mu.Unlock()

defer c.cancel()

started := time.Now()
defer func() {
resp.Duration = time.Since(started)
}()

if c.timeout > 0 {
time.AfterFunc(c.timeout, c.cancel)
}
if err := c.cmd.Start(); err != nil {
// Something unexpected happened when attempting to
// fork/exec, return immediately.
return resp, err
}

// Wait for the process to complete then attempt to determine the result
err = c.cmd.Wait()
if err != nil {
// The command most likely return a non-zero exit status.
if exitError, ok := err.(*exec.ExitError); ok {
// Best effort to determine the exit status, this
// should work on Linux, OSX, and Windows.
if status, ok := exitError.Sys().(syscall.WaitStatus); ok {
resp.Status = status.ExitStatus()
} else {
resp.Status = FallbackExitStatus
resp.Error = exitError
}
} else {
// Probably an I/O error
resp.Status = FallbackExitStatus
resp.Error = err
}
} else {
// Everything is A-OK.
resp.Status = OKExitStatus
}

return resp, nil
}

// ExecutionResponse provides the response information of an ExecutionRequest.
type ExecutionResponse struct {
// Command execution exit status.
Status int

// Duration provides command execution time.
Duration time.Duration

// Error is passed when the outcome of the execution is uncertain
Error error
}

// cError const error type for sentinels
type cError string

func (e cError) Error() string {
return string(e)
}
183 changes: 183 additions & 0 deletions pkg/receiver/jobreceiver/command/command_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package command

import (
"bytes"
"context"
"flag"
"fmt"
"io"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestMain lets the test binary emulate other processes
func TestMain(m *testing.M) {
flag.Parse()

pid := os.Getpid()
if os.Getenv("GO_EXEC_TEST_PID") == "" {
os.Setenv("GO_EXEC_TEST_PID", strconv.Itoa(pid))
os.Exit(m.Run())
}

args := flag.Args()
if len(args) == 0 {
fmt.Fprintf(os.Stderr, "No command\n")
os.Exit(2)
}

command, args := args[0], args[1:]
switch command {
case "echo":
fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " "))
case "exit":
if i, err := strconv.ParseInt(args[0], 10, 32); err == nil {
os.Exit(int(i))
}
panic("unexpected exit argument")
case "sleep":
if d, err := time.ParseDuration(args[0]); err == nil {
time.Sleep(d)
return
}
if i, err := strconv.ParseInt(args[0], 10, 64); err == nil {
time.Sleep(time.Second * time.Duration(i))
return
}
case "fork":
childCommand := NewExecution(context.Background(), ExecutionRequest{
Command: os.Args[0],
Arguments: args,
})
_, err := childCommand.Run()
if err != nil {
fmt.Fprintf(os.Stderr, "fork error: %v", err)
os.Exit(3)
}
}
}

//nolint:all
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
func TestExecute(t *testing.T) {
ctx := context.Background()
// Basic command execution
t.Run("basic", func(t *testing.T) {
echo := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "echo", Arguments: []string{"hello", "world"}}))
outC := eventualOutput(echo)
resp, err := echo.Run()
require.NoError(t, err)
assert.Equal(t, 0, resp.Status)
require.NoError(t, err)
assert.Contains(t, <-outC, "hello world")
})

// Command exits non-zero
t.Run("exit code 1", func(t *testing.T) {
exitCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "exit", Arguments: []string{"1"}}))
resp, err := exitCmd.Run()
require.NoError(t, err)
assert.Equal(t, 1, resp.Status)
exitCmd = NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "exit", Arguments: []string{"33"}}))
resp, err = exitCmd.Run()
require.NoError(t, err)
assert.Equal(t, 33, resp.Status)
})

// Command exceeds timeout
t.Run("exceeds timeout", func(t *testing.T) {
timeout := time.Millisecond * 100
sleepCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "sleep", Arguments: []string{"1m"}, Timeout: timeout}))
done := make(chan struct{})
go func() {
resp, err := sleepCmd.Run()
assert.NoError(t, err)
assert.NotEqual(t, OKExitStatus, resp.Status)
assert.LessOrEqual(t, timeout, resp.Duration)
close(done)
}()
select {
case <-time.After(5 * time.Second):
t.Errorf("command timeout exceeded but was not killed")
case <-done:
// okay
}
})
// Command exceeds timeout with child process
t.Run("exceeds timeout with child", func(t *testing.T) {
timeout := time.Millisecond * 100
sleepCmd := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "fork", Arguments: []string{"sleep", "1m"}, Timeout: timeout}))
done := make(chan struct{})
go func() {
resp, err := sleepCmd.Run()
assert.NoError(t, err)
assert.NotEqual(t, OKExitStatus, resp.Status)
assert.LessOrEqual(t, timeout, resp.Duration)
close(done)
}()
select {
case <-time.After(5 * time.Second):
t.Fatal("command timeout exceeded but was not killed")
case <-done:
// okay
}
})

// Invocation cannot be spuriously re-invoked
t.Run("cannot be ran twice", func(t *testing.T) {
echo := NewExecution(ctx, withTestHelper(t, ExecutionRequest{Command: "echo", Arguments: []string{"hello", "world"}}))
outC := eventualOutput(echo)
echo.Run()
_, err := echo.Run()
assert.ErrorIs(t, ErrAlreadyExecuted, err)
assert.Contains(t, <-outC, "hello world")
})
}

// withTestHelper takes an ExecutionRequest and adjusts it to run with the
// test binary. TestMain will handle emulating the command.
func withTestHelper(t *testing.T, request ExecutionRequest) ExecutionRequest {
t.Helper()

request.Arguments = append([]string{request.Command}, request.Arguments...)
request.Command = os.Args[0]
return request
}

//nolint:all
func eventualOutput(i *Execution) <-chan string {
out := make(chan string, 1)
stdout, _ := i.Stdout()
stderr, _ := i.Stderr()
go func() {
var buf syncBuffer
io.Copy(&buf, stdout)
io.Copy(&buf, stderr)
swiatekm marked this conversation as resolved.
Show resolved Hide resolved
out <- buf.String()
close(out)
}()
return out
}

type syncBuffer struct {
buf bytes.Buffer
mu sync.Mutex
}

func (s *syncBuffer) Write(p []byte) (n int, err error) {
s.mu.Lock()
defer s.mu.Unlock()
return s.buf.Write(p)
}

func (s *syncBuffer) String() string {
s.mu.Lock()
defer s.mu.Unlock()
return s.buf.String()
}
9 changes: 6 additions & 3 deletions pkg/receiver/jobreceiver/output/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// Interface consumes command output and emits telemetry data
type Interface interface {
Consume(stdin, stderr io.Reader) CloseFunc
Consume(ctx context.Context, stdin, stderr io.Reader) CloseFunc
}

// CloseFunc
Expand Down Expand Up @@ -45,8 +45,8 @@ type DemoConsumer struct {
}

// Consume reads stdout line by line and produces entries
func (p *DemoConsumer) Consume(stdout, _ io.Reader) CloseFunc {
ctx, cancel := context.WithCancel(context.Background())
func (p *DemoConsumer) Consume(ctx context.Context, stdout, stderr io.Reader) CloseFunc {
ctx, cancel := context.WithCancel(ctx)
go func() {
scanner := bufio.NewScanner(stdout)
for {
Expand All @@ -56,6 +56,9 @@ func (p *DemoConsumer) Consume(stdout, _ io.Reader) CloseFunc {
default:
}
if !scanner.Scan() {
if scanner.Err() != nil {
panic(scanner.Err())
}
return
}
ent, err := p.NewEntry(scanner.Text())
Expand Down
Loading