Skip to content

Commit

Permalink
[pkg/reciever/jobreceiver] Implement receiver
Browse files Browse the repository at this point in the history
Implements a feature gated monitoringjob receiver that schedules
and executes commands.

Signed-off-by: Christian Kruse <[email protected]>
  • Loading branch information
c-kruse committed Sep 25, 2023
1 parent 055976b commit 8ec26ae
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 94 deletions.
8 changes: 8 additions & 0 deletions pkg/receiver/jobreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ executable at defined intervals, and propagates the output from that process
as log events. In addition, the monitoring job receiver simplifies the process
of downloading runtime assets necessary to run a particular monitoring job.

## Feature Gate

This receiver is currently gated behind a [featuregate][featuregate]
`receiver.monitoringjob.enabled`. To start the collector with monitoringjobs
enabled use the `--feature-gates=receiver.monitoringjob.enabled` flag.

[featuregate]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/featuregate

## Configuration

| Configuration | Default | Description
Expand Down
57 changes: 5 additions & 52 deletions pkg/receiver/jobreceiver/command/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,64 +3,18 @@ package command
import (
"bytes"
"context"
"flag"
"fmt"
"io"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/internal/commandtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// TestMain lets the test binary emulate other processes
func TestMain(m *testing.M) {
flag.Parse()

pid := os.Getpid()
if os.Getenv("GO_EXEC_TEST_PID") == "" {
os.Setenv("GO_EXEC_TEST_PID", strconv.Itoa(pid))
os.Exit(m.Run())
}

args := flag.Args()
if len(args) == 0 {
fmt.Fprintf(os.Stderr, "No command\n")
os.Exit(2)
}

command, args := args[0], args[1:]
switch command {
case "echo":
fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " "))
case "exit":
if i, err := strconv.ParseInt(args[0], 10, 32); err == nil {
os.Exit(int(i))
}
panic("unexpected exit argument")
case "sleep":
if d, err := time.ParseDuration(args[0]); err == nil {
time.Sleep(d)
return
}
if i, err := strconv.ParseInt(args[0], 10, 64); err == nil {
time.Sleep(time.Second * time.Duration(i))
return
}
case "fork":
childCommand := NewExecution(context.Background(), ExecutionRequest{
Command: os.Args[0],
Arguments: args,
})
_, err := childCommand.Run()
if err != nil {
fmt.Fprintf(os.Stderr, "fork error: %v", err)
os.Exit(3)
}
}
commandtest.WrapTestMain(m)
}

func TestExecute(t *testing.T) {
Expand Down Expand Up @@ -141,12 +95,11 @@ func TestExecute(t *testing.T) {

// withTestHelper takes an ExecutionRequest and adjusts it to run with the
// test binary. TestMain will handle emulating the command.
func withTestHelper(t *testing.T, request ExecutionRequest) ExecutionRequest {
func withTestHelper(t *testing.T, r ExecutionRequest) ExecutionRequest {
t.Helper()

request.Arguments = append([]string{request.Command}, request.Arguments...)
request.Command = os.Args[0]
return request
r.Command, r.Arguments = commandtest.WrapCommand(r.Command, r.Arguments)
return r
}

func eventualOutput(t *testing.T, i *Execution) <-chan string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/receiver/jobreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Config struct {
Output output.Config `mapstructure:"output"`
}

// ExecutionConfig defines the configuration for execution of a monitorinjob
// ExecutionConfig defines the configuration for execution of a monitoringjob
// process
type ExecutionConfig struct {
// Command is the name of the binary to be executed
Expand Down
2 changes: 1 addition & 1 deletion pkg/receiver/jobreceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/component v0.85.0
go.opentelemetry.io/collector/confmap v0.85.0
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014
go.opentelemetry.io/collector/receiver v0.85.0
go.uber.org/zap v1.25.0
)
Expand Down Expand Up @@ -38,7 +39,6 @@ require (
go.opentelemetry.io/collector/consumer v0.85.0 // indirect
go.opentelemetry.io/collector/exporter v0.85.0 // indirect
go.opentelemetry.io/collector/extension v0.85.0 // indirect
go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 // indirect
go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 // indirect
go.opentelemetry.io/collector/processor v0.85.0 // indirect
go.opentelemetry.io/otel v1.17.0 // indirect
Expand Down
69 changes: 69 additions & 0 deletions pkg/receiver/jobreceiver/internal/commandtest/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package commandtest

import (
"context"
"flag"
"fmt"
"os"
"os/exec"
"strconv"
"strings"
"testing"
"time"
)

// WrapTestMain can be used in TestMain to wrap the go test binary with basic
// command emulation, normalizing the test environment across platforms.
//
// Usage:
//
// func TestMain(m *testing.M) {
// commandtest.WrapTestMain(m)
// }
func WrapTestMain(m *testing.M) {
flag.Parse()

pid := os.Getpid()
if os.Getenv("GO_EXEC_TEST_PID") == "" {
os.Setenv("GO_EXEC_TEST_PID", strconv.Itoa(pid))
os.Exit(m.Run())
}

args := flag.Args()
if len(args) == 0 {
fmt.Fprintf(os.Stderr, "No command\n")
os.Exit(2)
}

command, args := args[0], args[1:]
switch command {
case "echo":
fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " "))
case "exit":
if i, err := strconv.ParseInt(args[0], 10, 32); err == nil {
os.Exit(int(i))
}
panic("unexpected exit argument")
case "sleep":
if d, err := time.ParseDuration(args[0]); err == nil {
time.Sleep(d)
return
}
if i, err := strconv.ParseInt(args[0], 10, 64); err == nil {
time.Sleep(time.Second * time.Duration(i))
return
}
case "fork":
childCommand := exec.CommandContext(context.Background(), os.Args[0], args...)
if err := childCommand.Run(); err != nil {
fmt.Fprintf(os.Stderr, "fork error: %v", err)
os.Exit(3)
}
}
}

// WrapCommand adjusts a command and arguments to run emuldated by a go test
// binary wrapped with WrapTestMain
func WrapCommand(cmd string, args []string) (string, []string) {
return os.Args[0], append([]string{cmd}, args...)
}
8 changes: 5 additions & 3 deletions pkg/receiver/jobreceiver/output/logentries/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"io"

"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decoder"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split"
"go.uber.org/zap"
Expand Down Expand Up @@ -35,11 +35,11 @@ type LogEntriesConfig struct {

func (c *LogEntriesConfig) Build(logger *zap.SugaredLogger, op consumer.WriterOp) (consumer.Interface, error) {

encoding, err := decoder.LookupEncoding(c.Encoding)
encoding, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return nil, fmt.Errorf("log_entries configuration unable to use encoding %s: %w", c.Encoding, err)
}
splitFunc, err := c.Multiline.Build(encoding, true, true, true, nil, int(c.MaxLogSize))
splitFunc, err := c.Multiline.Func(encoding, true, int(c.MaxLogSize), nopTrim)
if err != nil {
return nil, fmt.Errorf("log_entries configuration could not build split function: %w", err)
}
Expand Down Expand Up @@ -95,3 +95,5 @@ func (f scannerFactory) splitWithTruncate() bufio.SplitFunc {
return
}
}

func nopTrim(b []byte) []byte { return b }
81 changes: 81 additions & 0 deletions pkg/receiver/jobreceiver/receiver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package jobreceiver

import (
"context"
"testing"
"time"

"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/internal/commandtest"
"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/receiver/receivertest"
)

// TestMain enable command emulation from commandtest
func TestMain(m *testing.M) {
commandtest.WrapTestMain(m)
}

func TestMonitoringJob(t *testing.T) {
// basic test
require.NoError(t, featuregate.GlobalRegistry().Set(featureEnabledId, true))
t.Cleanup(func() { require.NoError(t, featuregate.GlobalRegistry().Set(featureEnabledId, false)) })

f := NewFactory()
cfg := testdataConfigSimple()

sink := new(consumertest.LogsSink)

rec, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink)
require.NoError(t, err)

require.NoError(t, rec.Start(context.Background(), componenttest.NewNopHost()))

if !assert.Eventually(t, expectNLogs(sink, 1), time.Second*5, time.Millisecond*50, "expected one log entry") {
t.Fatalf("actual %d, %v", sink.LogRecordCount(), sink.AllLogs())
}
require.NoError(t, rec.Shutdown(context.Background()))

first := sink.AllLogs()[0]
firstRecord := first.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
assert.Equal(t, "hello world", firstRecord.Body().AsString())

t.Run("disabled by feature gate", func(t *testing.T) {
require.NoError(t, featuregate.GlobalRegistry().Set(featureEnabledId, false))

f := NewFactory()
cfg := testdataConfigSimple()

sink := new(consumertest.LogsSink)

rec, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink)
require.NoError(t, err)

require.NoError(t, rec.Start(context.Background(), componenttest.NewNopHost()))
// TODO(ck) bleh.
<-time.After(time.Millisecond * 1500)

assert.Equal(t, 0, sink.LogRecordCount())
})
}

func testdataConfigSimple() *Config {
cfg := &Config{
Exec: newDefaultExecutionConfig(),
Schedule: ScheduleConfig{Interval: time.Millisecond * 100},
Output: output.NewDefaultConfig(),
}
cmd, args := commandtest.WrapCommand("echo", []string{"hello world"})
cfg.Exec.Command, cfg.Exec.Arguments = cmd, args
return cfg
}

func expectNLogs(sink *consumertest.LogsSink, expected int) func() bool {
return func() bool {
return expected <= sink.LogRecordCount()
}
}
Loading

0 comments on commit 8ec26ae

Please sign in to comment.