diff --git a/pkg/receiver/jobreceiver/output/event/config.go b/pkg/receiver/jobreceiver/output/event/config.go index 49cb6be6c5..010a306157 100644 --- a/pkg/receiver/jobreceiver/output/event/config.go +++ b/pkg/receiver/jobreceiver/output/event/config.go @@ -6,6 +6,12 @@ import ( "go.uber.org/zap" ) +const ( + commandNameLabel = "command.name" + commandStatusLabel = "command.status" + commandDurationLabel = "command.duration" +) + // EventConfig handles output as if it is a monitoring job. // Should emit a single event per command execution summarizing the execution. type EventConfig struct { @@ -22,7 +28,7 @@ type EventConfig struct { } func (c *EventConfig) Build(logger *zap.SugaredLogger, op consumer.WriterOp) (consumer.Interface, error) { - return &consumer.DemoConsumer{WriterOp: op, Logger: logger}, nil + return &handler{writer: op, logger: logger, config: *c}, nil } type EventConfigFactory struct{} diff --git a/pkg/receiver/jobreceiver/output/event/handler.go b/pkg/receiver/jobreceiver/output/event/handler.go new file mode 100644 index 0000000000..d24c1761bb --- /dev/null +++ b/pkg/receiver/jobreceiver/output/event/handler.go @@ -0,0 +1,116 @@ +package event + +import ( + "bytes" + "context" + "errors" + "io" + "sync" + + "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer" + "go.uber.org/zap" +) + +var errSizeLimitExceeded = errors.New("buffer size limit exceeded") + +type handler struct { + logger *zap.SugaredLogger + writer consumer.WriterOp + + config EventConfig +} + +var _ consumer.Interface = (*handler)(nil) + +// Consume +func (h *handler) Consume(ctx context.Context, stdout, stderr io.Reader) consumer.CloseFunc { + e := eventOutputBuffer{ + ctx: ctx, + logger: h.logger, + writer: h.writer, + EventConfig: h.config, + } + e.Start(stdout, stderr) + return e.Close +} + +type eventOutputBuffer struct { + EventConfig + + ctx context.Context + logger *zap.SugaredLogger + writer consumer.WriterOp + + wg sync.WaitGroup + mu sync.Mutex + buf bytes.Buffer +} + +// Start consuming the intput streams into the buffer +func (b *eventOutputBuffer) Start(stdout, stderr io.Reader) { + b.wg.Add(2) + go b.consume(stdout) + go b.consume(stderr) +} + +// consume reads the input into the buffer until either EOF is reached or the +// buffer is full. Once the buffer is full, consume discards remaining input +// until EOF or read error +func (b *eventOutputBuffer) consume(in io.Reader) { + defer b.wg.Done() + + _, err := io.Copy(b, in) + if errors.Is(err, errSizeLimitExceeded) { + _, err = io.Copy(io.Discard, in) + } + if err != nil { + b.logger.Errorf("io error consuming event input: %s", err) + } +} + +// Close builds a new log entry based off the exeuction summary and contents +// of the buffer. Writes the entry to the pipeline. +func (b *eventOutputBuffer) Close(summary consumer.ExecutionSummary) { + // Wait for all content to be written to the buffer + b.wg.Wait() + + ent, err := b.writer.NewEntry(b.String()) + if err != nil { + b.logger.Errorf("event output buffer could not create a new log entry: %s", err) + } + if ent.Attributes == nil { + ent.Attributes = map[string]interface{}{} + } + if b.IncludeCommandName { + ent.Attributes[commandNameLabel] = summary.Command + } + if b.IncludeCommandStatus { + ent.Attributes[commandStatusLabel] = summary.ExitCode + } + if b.IncludeDuration { + ent.Attributes[commandDurationLabel] = summary.RunDuration.Seconds() + } + + b.writer.Write(b.ctx, ent) +} + +// Write to the buffer. Meant to be used by both output streams +// in a monitoring plugin spec compliant way. +// Will accept writes until MaxBodySize is reached. +func (b *eventOutputBuffer) Write(p []byte) (n int, err error) { + b.mu.Lock() + defer b.mu.Unlock() + if rem := int(b.MaxBodySize) - b.buf.Len(); b.MaxBodySize > 0 && len(p) > rem { + if w, wErr := b.buf.Write(p[:rem]); wErr != nil { + return w, wErr + } + return rem, errSizeLimitExceeded + } + return b.buf.Write(p) +} + +func (b *eventOutputBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.String() +} diff --git a/pkg/receiver/jobreceiver/output/event/handler_test.go b/pkg/receiver/jobreceiver/output/event/handler_test.go new file mode 100644 index 0000000000..a877479689 --- /dev/null +++ b/pkg/receiver/jobreceiver/output/event/handler_test.go @@ -0,0 +1,70 @@ +package event + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" +) + +func TestConsume(t *testing.T) { + var w stubWriter + h := handler{ + logger: nil, + writer: &w, + config: EventConfig{ + IncludeCommandName: true, + IncludeCommandStatus: true, + IncludeDuration: true, + MaxBodySize: 5, + }, + } + + var stdout, stderr bytes.Buffer + fmt.Fprint(&stdout, "hello world") + fmt.Fprint(&stderr, "hello world") + cb := h.Consume(context.Background(), io.NopCloser(&stdout), io.NopCloser(&stderr)) + cb(consumer.ExecutionSummary{ + Command: "exit", + RunDuration: time.Millisecond * 500, + ExitCode: 2, + }) + + if len(w.Out) != 1 { + t.Fatalf("expected handler to write single entry, got %v", w.Out) + } + actualEntry := w.Out[0] + if actualEntry.Body.(string) != "hello" { + t.Errorf("expected handler to write single entry with empty string output, got %v", actualEntry.Body) + } + if actualEntry.Attributes[commandNameLabel] != "exit" { + t.Errorf("expected handler to write entry with command.name, got %v", actualEntry.Attributes) + } + if actualEntry.Attributes[commandStatusLabel] != 2 { + t.Errorf("expected handler to write entry with command.status, got %v", actualEntry.Attributes) + } + if actualEntry.Attributes[commandDurationLabel] != 0.5 { + t.Errorf("expected handler to write entry with command.duration, got %v", actualEntry.Attributes) + } +} + +type stubWriter struct { + Out []*entry.Entry +} + +func (s *stubWriter) NewEntry(value interface{}) (*entry.Entry, error) { + e := entry.New() + e.Attributes = make(map[string]interface{}) + e.Resource = make(map[string]interface{}) + e.Body = value + return e, nil +} + +func (s *stubWriter) Write(ctx context.Context, e *entry.Entry) { + s.Out = append(s.Out, e) +}