Skip to content

Commit

Permalink
[pkg/reciever/jobreceiver] Implement event output handler
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Kruse <[email protected]>
  • Loading branch information
c-kruse committed Sep 22, 2023
1 parent 5b028e2 commit 885bda4
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 1 deletion.
8 changes: 7 additions & 1 deletion pkg/receiver/jobreceiver/output/event/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import (
"go.uber.org/zap"
)

const (
commandNameLabel = "command.name"
commandStatusLabel = "command.status"
commandDurationLabel = "command.duration"
)

// EventConfig handles output as if it is a monitoring job.
// Should emit a single event per command execution summarizing the execution.
type EventConfig struct {
Expand All @@ -22,7 +28,7 @@ type EventConfig struct {
}

func (c *EventConfig) Build(logger *zap.SugaredLogger, op consumer.WriterOp) (consumer.Interface, error) {
return &consumer.DemoConsumer{WriterOp: op, Logger: logger}, nil
return &handler{writer: op, logger: logger, config: *c}, nil
}

type EventConfigFactory struct{}
Expand Down
116 changes: 116 additions & 0 deletions pkg/receiver/jobreceiver/output/event/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package event

import (
"bytes"
"context"
"errors"
"io"
"sync"

"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer"
"go.uber.org/zap"
)

var errSizeLimitExceeded = errors.New("buffer size limit exceeded")

type handler struct {
logger *zap.SugaredLogger
writer consumer.WriterOp

config EventConfig
}

var _ consumer.Interface = (*handler)(nil)

// Consume
func (h *handler) Consume(ctx context.Context, stdout, stderr io.Reader) consumer.CloseFunc {
e := eventOutputBuffer{
ctx: ctx,
logger: h.logger,
writer: h.writer,
EventConfig: h.config,
}
e.Start(stdout, stderr)
return e.Close
}

type eventOutputBuffer struct {
EventConfig

ctx context.Context
logger *zap.SugaredLogger
writer consumer.WriterOp

wg sync.WaitGroup
mu sync.Mutex
buf bytes.Buffer
}

// Start consuming the intput streams into the buffer
func (b *eventOutputBuffer) Start(stdout, stderr io.Reader) {
b.wg.Add(2)
go b.consume(stdout)
go b.consume(stderr)
}

// consume reads the input into the buffer until either EOF is reached or the
// buffer is full. Once the buffer is full, consume discards remaining input
// until EOF or read error
func (b *eventOutputBuffer) consume(in io.Reader) {
defer b.wg.Done()

_, err := io.Copy(b, in)
if errors.Is(err, errSizeLimitExceeded) {
_, err = io.Copy(io.Discard, in)
}
if err != nil {
b.logger.Errorf("io error consuming event input: %s", err)
}
}

// Close builds a new log entry based off the exeuction summary and contents
// of the buffer. Writes the entry to the pipeline.
func (b *eventOutputBuffer) Close(summary consumer.ExecutionSummary) {
// Wait for all content to be written to the buffer
b.wg.Wait()

ent, err := b.writer.NewEntry(b.String())
if err != nil {
b.logger.Errorf("event output buffer could not create a new log entry: %s", err)
}
if ent.Attributes == nil {
ent.Attributes = map[string]interface{}{}
}
if b.IncludeCommandName {
ent.Attributes[commandNameLabel] = summary.Command
}
if b.IncludeCommandStatus {
ent.Attributes[commandStatusLabel] = summary.ExitCode
}
if b.IncludeDuration {
ent.Attributes[commandDurationLabel] = summary.RunDuration.Seconds()
}

b.writer.Write(b.ctx, ent)
}

// Write to the buffer. Meant to be used by both output streams
// in a monitoring plugin spec compliant way.
// Will accept writes until MaxBodySize is reached.
func (b *eventOutputBuffer) Write(p []byte) (n int, err error) {
b.mu.Lock()
defer b.mu.Unlock()
if rem := int(b.MaxBodySize) - b.buf.Len(); b.MaxBodySize > 0 && len(p) > rem {
if w, wErr := b.buf.Write(p[:rem]); wErr != nil {
return w, wErr
}
return rem, errSizeLimitExceeded
}
return b.buf.Write(p)
}

func (b *eventOutputBuffer) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return b.buf.String()
}
70 changes: 70 additions & 0 deletions pkg/receiver/jobreceiver/output/event/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package event

import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"

"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
)

func TestConsume(t *testing.T) {
var w stubWriter
h := handler{
logger: nil,
writer: &w,
config: EventConfig{
IncludeCommandName: true,
IncludeCommandStatus: true,
IncludeDuration: true,
MaxBodySize: 5,
},
}

var stdout, stderr bytes.Buffer
fmt.Fprint(&stdout, "hello world")
fmt.Fprint(&stderr, "hello world")
cb := h.Consume(context.Background(), io.NopCloser(&stdout), io.NopCloser(&stderr))
cb(consumer.ExecutionSummary{
Command: "exit",
RunDuration: time.Millisecond * 500,
ExitCode: 2,
})

if len(w.Out) != 1 {
t.Fatalf("expected handler to write single entry, got %v", w.Out)
}
actualEntry := w.Out[0]
if actualEntry.Body.(string) != "hello" {
t.Errorf("expected handler to write single entry with empty string output, got %v", actualEntry.Body)
}
if actualEntry.Attributes[commandNameLabel] != "exit" {
t.Errorf("expected handler to write entry with command.name, got %v", actualEntry.Attributes)
}
if actualEntry.Attributes[commandStatusLabel] != 2 {
t.Errorf("expected handler to write entry with command.status, got %v", actualEntry.Attributes)
}
if actualEntry.Attributes[commandDurationLabel] != 0.5 {
t.Errorf("expected handler to write entry with command.duration, got %v", actualEntry.Attributes)
}
}

type stubWriter struct {
Out []*entry.Entry
}

func (s *stubWriter) NewEntry(value interface{}) (*entry.Entry, error) {
e := entry.New()
e.Attributes = make(map[string]interface{})
e.Resource = make(map[string]interface{})
e.Body = value
return e, nil
}

func (s *stubWriter) Write(ctx context.Context, e *entry.Entry) {
s.Out = append(s.Out, e)
}

0 comments on commit 885bda4

Please sign in to comment.