diff --git a/pkg/receiver/jobreceiver/README.md b/pkg/receiver/jobreceiver/README.md index db847a9db6..0dbb9f91bc 100644 --- a/pkg/receiver/jobreceiver/README.md +++ b/pkg/receiver/jobreceiver/README.md @@ -10,6 +10,14 @@ executable at defined intervals, and propagates the output from that process as log events. In addition, the monitoring job receiver simplifies the process of downloading runtime assets necessary to run a particular monitoring job. +## Feature Gate + +This receiver is currently gated behind a [featuregate][featuregate] +`receiver.monitoringjob.enabled`. To start the collector with monitoringjobs +enabled use the `--feature-gates=receiver.monitoringjob.enabled` flag. + +[featuregate]: https://github.com/open-telemetry/opentelemetry-collector/tree/main/featuregate + ## Configuration | Configuration | Default | Description diff --git a/pkg/receiver/jobreceiver/command/command_test.go b/pkg/receiver/jobreceiver/command/command_test.go index 2eb99c3c53..d981a6fba2 100644 --- a/pkg/receiver/jobreceiver/command/command_test.go +++ b/pkg/receiver/jobreceiver/command/command_test.go @@ -3,64 +3,18 @@ package command import ( "bytes" "context" - "flag" - "fmt" "io" - "os" - "strconv" - "strings" "testing" "time" + "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/internal/commandtest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) // TestMain lets the test binary emulate other processes func TestMain(m *testing.M) { - flag.Parse() - - pid := os.Getpid() - if os.Getenv("GO_EXEC_TEST_PID") == "" { - os.Setenv("GO_EXEC_TEST_PID", strconv.Itoa(pid)) - os.Exit(m.Run()) - } - - args := flag.Args() - if len(args) == 0 { - fmt.Fprintf(os.Stderr, "No command\n") - os.Exit(2) - } - - command, args := args[0], args[1:] - switch command { - case "echo": - fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " ")) - case "exit": - if i, err := strconv.ParseInt(args[0], 10, 32); err == nil { - os.Exit(int(i)) - } - panic("unexpected exit argument") - case "sleep": - if d, err := time.ParseDuration(args[0]); err == nil { - time.Sleep(d) - return - } - if i, err := strconv.ParseInt(args[0], 10, 64); err == nil { - time.Sleep(time.Second * time.Duration(i)) - return - } - case "fork": - childCommand := NewExecution(context.Background(), ExecutionRequest{ - Command: os.Args[0], - Arguments: args, - }) - _, err := childCommand.Run() - if err != nil { - fmt.Fprintf(os.Stderr, "fork error: %v", err) - os.Exit(3) - } - } + commandtest.WrapTestMain(m) } func TestExecute(t *testing.T) { @@ -141,12 +95,11 @@ func TestExecute(t *testing.T) { // withTestHelper takes an ExecutionRequest and adjusts it to run with the // test binary. TestMain will handle emulating the command. -func withTestHelper(t *testing.T, request ExecutionRequest) ExecutionRequest { +func withTestHelper(t *testing.T, r ExecutionRequest) ExecutionRequest { t.Helper() - request.Arguments = append([]string{request.Command}, request.Arguments...) - request.Command = os.Args[0] - return request + r.Command, r.Arguments = commandtest.WrapCommand(r.Command, r.Arguments) + return r } func eventualOutput(t *testing.T, i *Execution) <-chan string { diff --git a/pkg/receiver/jobreceiver/config.go b/pkg/receiver/jobreceiver/config.go index 9c47468ce9..704e5f8753 100644 --- a/pkg/receiver/jobreceiver/config.go +++ b/pkg/receiver/jobreceiver/config.go @@ -16,7 +16,7 @@ type Config struct { Output output.Config `mapstructure:"output"` } -// ExecutionConfig defines the configuration for execution of a monitorinjob +// ExecutionConfig defines the configuration for execution of a monitoringjob // process type ExecutionConfig struct { // Command is the name of the binary to be executed diff --git a/pkg/receiver/jobreceiver/go.mod b/pkg/receiver/jobreceiver/go.mod index 27deb52f99..e7807c102f 100644 --- a/pkg/receiver/jobreceiver/go.mod +++ b/pkg/receiver/jobreceiver/go.mod @@ -7,6 +7,7 @@ require ( github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/component v0.85.0 go.opentelemetry.io/collector/confmap v0.85.0 + go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 go.opentelemetry.io/collector/receiver v0.85.0 go.uber.org/zap v1.25.0 ) @@ -38,7 +39,6 @@ require ( go.opentelemetry.io/collector/consumer v0.85.0 // indirect go.opentelemetry.io/collector/exporter v0.85.0 // indirect go.opentelemetry.io/collector/extension v0.85.0 // indirect - go.opentelemetry.io/collector/featuregate v1.0.0-rcv0014 // indirect go.opentelemetry.io/collector/pdata v1.0.0-rcv0014 // indirect go.opentelemetry.io/collector/processor v0.85.0 // indirect go.opentelemetry.io/otel v1.17.0 // indirect diff --git a/pkg/receiver/jobreceiver/internal/commandtest/command.go b/pkg/receiver/jobreceiver/internal/commandtest/command.go new file mode 100644 index 0000000000..189ceb9252 --- /dev/null +++ b/pkg/receiver/jobreceiver/internal/commandtest/command.go @@ -0,0 +1,69 @@ +package commandtest + +import ( + "context" + "flag" + "fmt" + "os" + "os/exec" + "strconv" + "strings" + "testing" + "time" +) + +// WrapTestMain can be used in TestMain to wrap the go test binary with basic +// command emulation, normalizing the test environment across platforms. +// +// Usage: +// +// func TestMain(m *testing.M) { +// commandtest.WrapTestMain(m) +// } +func WrapTestMain(m *testing.M) { + flag.Parse() + + pid := os.Getpid() + if os.Getenv("GO_EXEC_TEST_PID") == "" { + os.Setenv("GO_EXEC_TEST_PID", strconv.Itoa(pid)) + os.Exit(m.Run()) + } + + args := flag.Args() + if len(args) == 0 { + fmt.Fprintf(os.Stderr, "No command\n") + os.Exit(2) + } + + command, args := args[0], args[1:] + switch command { + case "echo": + fmt.Fprintf(os.Stdout, "%s", strings.Join(args, " ")) + case "exit": + if i, err := strconv.ParseInt(args[0], 10, 32); err == nil { + os.Exit(int(i)) + } + panic("unexpected exit argument") + case "sleep": + if d, err := time.ParseDuration(args[0]); err == nil { + time.Sleep(d) + return + } + if i, err := strconv.ParseInt(args[0], 10, 64); err == nil { + time.Sleep(time.Second * time.Duration(i)) + return + } + case "fork": + childCommand := exec.CommandContext(context.Background(), os.Args[0], args...) + if err := childCommand.Run(); err != nil { + fmt.Fprintf(os.Stderr, "fork error: %v", err) + os.Exit(3) + } + } +} + +// WrapCommand adjusts a command and arguments to run emuldated by a go test +// binary wrapped with WrapTestMain +func WrapCommand(cmd string, args []string) (string, []string) { + return os.Args[0], append([]string{cmd}, args...) +} diff --git a/pkg/receiver/jobreceiver/output/logentries/config.go b/pkg/receiver/jobreceiver/output/logentries/config.go index 6b6474e274..d16245583b 100644 --- a/pkg/receiver/jobreceiver/output/logentries/config.go +++ b/pkg/receiver/jobreceiver/output/logentries/config.go @@ -6,7 +6,7 @@ import ( "io" "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decoder" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" "go.uber.org/zap" @@ -35,11 +35,11 @@ type LogEntriesConfig struct { func (c *LogEntriesConfig) Build(logger *zap.SugaredLogger, op consumer.WriterOp) (consumer.Interface, error) { - encoding, err := decoder.LookupEncoding(c.Encoding) + encoding, err := decode.LookupEncoding(c.Encoding) if err != nil { return nil, fmt.Errorf("log_entries configuration unable to use encoding %s: %w", c.Encoding, err) } - splitFunc, err := c.Multiline.Build(encoding, true, true, true, nil, int(c.MaxLogSize)) + splitFunc, err := c.Multiline.Func(encoding, true, int(c.MaxLogSize), nopTrim) if err != nil { return nil, fmt.Errorf("log_entries configuration could not build split function: %w", err) } @@ -95,3 +95,5 @@ func (f scannerFactory) splitWithTruncate() bufio.SplitFunc { return } } + +func nopTrim(b []byte) []byte { return b } diff --git a/pkg/receiver/jobreceiver/receiver_test.go b/pkg/receiver/jobreceiver/receiver_test.go new file mode 100644 index 0000000000..40decd2aa1 --- /dev/null +++ b/pkg/receiver/jobreceiver/receiver_test.go @@ -0,0 +1,82 @@ +package jobreceiver + +import ( + "context" + "testing" + "time" + + "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/internal/commandtest" + "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/receiver/receivertest" +) + +// TestMain enable command emulation from commandtest +func TestMain(m *testing.M) { + commandtest.WrapTestMain(m) +} + +func TestMonitoringJob(t *testing.T) { + // basic test + err := featuregate.GlobalRegistry().Set(featureEnabledId, true) + require.NoError(t, err) + + f := NewFactory() + cfg := testdataConfigSimple() + + sink := new(consumertest.LogsSink) + + rec, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink) + require.NoError(t, err) + + rec.Start(context.Background(), componenttest.NewNopHost()) + + if !assert.Eventually(t, expectNLogs(sink, 1), time.Second*5, time.Millisecond*50, "expected one log entry") { + t.Fatalf("actual %d, %v", sink.LogRecordCount(), sink.AllLogs()) + } + require.NoError(t, rec.Shutdown(context.Background())) + + first := sink.AllLogs()[0] + firstRecord := first.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0) + assert.Equal(t, "hello world", firstRecord.Body().AsString()) + + t.Run("disabled by feature gate", func(t *testing.T) { + err := featuregate.GlobalRegistry().Set(featureEnabledId, false) + require.NoError(t, err) + + f := NewFactory() + cfg := testdataConfigSimple() + + sink := new(consumertest.LogsSink) + + rec, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, sink) + require.NoError(t, err) + + rec.Start(context.Background(), componenttest.NewNopHost()) + // TODO(ck) bleh. + <-time.After(time.Millisecond * 1500) + + assert.Equal(t, 0, sink.LogRecordCount()) + }) +} + +func testdataConfigSimple() *Config { + cfg := &Config{ + Exec: newDefaultExecutionConfig(), + Schedule: ScheduleConfig{Interval: time.Millisecond * 100}, + Output: output.NewDefaultConfig(), + } + cmd, args := commandtest.WrapCommand("echo", []string{"hello world"}) + cfg.Exec.Command, cfg.Exec.Arguments = cmd, args + return cfg +} + +func expectNLogs(sink *consumertest.LogsSink, expected int) func() bool { + return func() bool { + return expected <= sink.LogRecordCount() + } +} diff --git a/pkg/receiver/jobreceiver/runner.go b/pkg/receiver/jobreceiver/runner.go index 5a82ae661e..2094265df4 100644 --- a/pkg/receiver/jobreceiver/runner.go +++ b/pkg/receiver/jobreceiver/runner.go @@ -2,64 +2,120 @@ package jobreceiver import ( "context" + "sync" + "time" "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/builder" "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/command" "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "go.opentelemetry.io/collector/featuregate" "go.uber.org/zap" ) +const ( + featureEnabledId = "receiver.monitoringjob.enabled" + featureEnabledStage = featuregate.StageAlpha + featureEnabledDescription = "When enabled, the collector will schedule monitoring jobs." +) + +var enabledGate *featuregate.Gate + +func init() { + enabledGate = featuregate.GlobalRegistry().MustRegister( + featureEnabledId, + featureEnabledStage, + featuregate.WithRegisterDescription(featureEnabledDescription), + ) +} + // Build returns the job runner, the process responsible for scheduling and // running commands and piping their output to the output consumer. func (c Config) Build(logger *zap.SugaredLogger, out consumer.Interface) (builder.JobRunner, error) { - return &stubRunner{ - Exec: c.Exec, - Consumer: out, + if !enabledGate.IsEnabled() { + logger.Warn("monitoringjob feature is not enabled, will not run.") + return &nopRunner{}, nil + } + return &runner{ + exec: c.Exec, + schedule: c.Schedule, + consumer: out, }, nil } -// stubRunner is a stub implementation. -type stubRunner struct { - Exec ExecutionConfig - Consumer consumer.Interface +// runner schedules and executes commands +type runner struct { + exec ExecutionConfig + schedule ScheduleConfig + consumer consumer.Interface + + logger *zap.SugaredLogger + + wg sync.WaitGroup + cancel func() } // Start stub impl. runs command once at startup then idles indefinitely. -func (r *stubRunner) Start(operator.Persister) error { +func (r *runner) Start(operator.Persister) error { + r.wg.Add(1) + + ctx := context.WithValue(context.Background(), consumer.ContextKeyCommandName, r.exec.Command) + ctx, r.cancel = context.WithCancel(ctx) go func() { - ctx := context.Background() - ctx = context.WithValue(ctx, consumer.ContextKeyCommandName, r.Exec.Command) - cmd := command.NewExecution(ctx, command.ExecutionRequest{ - Command: r.Exec.Command, - Arguments: r.Exec.Arguments, - Timeout: r.Exec.Timeout, - }) - - stdout, err := cmd.Stdout() - if err != nil { - panic(err) - } - stderr, err := cmd.Stderr() - if err != nil { - panic(err) - } - cb := r.Consumer.Consume(ctx, stdout, stderr) + defer r.wg.Done() + + // TODO(ck) spec using persistence for interval timing. + ticker := time.NewTicker(r.schedule.Interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } - resp, err := cmd.Run() - if err != nil { - panic(err) + cmd := command.NewExecution(ctx, command.ExecutionRequest{ + Command: r.exec.Command, + Arguments: r.exec.Arguments, + Timeout: r.exec.Timeout, + }) + + stdout, err := cmd.Stdout() + if err != nil { + r.logger.Errorf("monitoringjob runner failed to create command Stdout pipe: %s", err) + continue + } + stderr, err := cmd.Stderr() + if err != nil { + r.logger.Errorf("monitoringjob runner failed to create command Stderr pipe: %s", err) + continue + } + cb := r.consumer.Consume(ctx, stdout, stderr) + + resp, err := cmd.Run() + if err != nil { + r.logger.Errorf("monitoringjob runner failed to run command: %s", err) + continue + } + cb(consumer.ExecutionSummary{ + Command: r.exec.Command, + ExitCode: resp.Status, + RunDuration: resp.Duration, + }) } - cb(consumer.ExecutionSummary{ - Command: r.Exec.Command, - ExitCode: resp.Status, - RunDuration: resp.Duration, - }) }() return nil } -func (r *stubRunner) Stop() error { +func (r *runner) Stop() error { + r.cancel() + r.wg.Wait() return nil } + +type nopRunner struct{} + +func (_ *nopRunner) Start(operator.Persister) error { return nil } +func (_ *nopRunner) Stop() error { return nil } diff --git a/pkg/receiver/jobreceiver/testdata/usage.yaml b/pkg/receiver/jobreceiver/testdata/usage.yaml index 6e8f9748ea..b9d5bb47ad 100644 --- a/pkg/receiver/jobreceiver/testdata/usage.yaml +++ b/pkg/receiver/jobreceiver/testdata/usage.yaml @@ -2,7 +2,7 @@ receivers: monitoringjob/log: schedule: - interval: 1h + interval: 1m exec: command: /bin/sh timeout: 1m @@ -41,7 +41,7 @@ receivers: monitoringjob/event: schedule: - interval: 1h + interval: 10s exec: command: uptime timeout: 1m @@ -79,7 +79,7 @@ processors: service: pipelines: - logs/monitorinjob: + logs/monitoringjob: receivers: - monitoringjob/event - monitoringjob/log