From 055976b957c73c468f5353227297300f89d07b1d Mon Sep 17 00:00:00 2001 From: Christian Kruse Date: Fri, 8 Sep 2023 09:19:18 -0700 Subject: [PATCH] [pkg/reciever/jobreceiver] Implement logentries output handler Signed-off-by: Christian Kruse --- .../jobreceiver/output/consumer/consumer.go | 38 +------- .../jobreceiver/output/logentries/config.go | 62 ++++++++++++- .../jobreceiver/output/logentries/handler.go | 55 +++++++++++ .../output/logentries/handler_test.go | 91 +++++++++++++++++++ pkg/receiver/jobreceiver/runner.go | 1 + pkg/receiver/jobreceiver/testdata/usage.yaml | 35 +++---- 6 files changed, 228 insertions(+), 54 deletions(-) create mode 100644 pkg/receiver/jobreceiver/output/logentries/handler.go create mode 100644 pkg/receiver/jobreceiver/output/logentries/handler_test.go diff --git a/pkg/receiver/jobreceiver/output/consumer/consumer.go b/pkg/receiver/jobreceiver/output/consumer/consumer.go index 8ce91f9be6..9bf0d4833e 100644 --- a/pkg/receiver/jobreceiver/output/consumer/consumer.go +++ b/pkg/receiver/jobreceiver/output/consumer/consumer.go @@ -1,9 +1,7 @@ package consumer import ( - "bufio" "context" - "fmt" "io" "time" @@ -37,38 +35,6 @@ type WriterOp interface { Write(ctx context.Context, e *entry.Entry) } -// DemoConsumer stub consumer implementation. -// todo(ck) delete - this is a stub implementation for PoC purposes only. -type DemoConsumer struct { - WriterOp - Logger *zap.SugaredLogger -} - -// Consume reads stdout line by line and produces entries -func (p *DemoConsumer) Consume(ctx context.Context, stdout, stderr io.Reader) CloseFunc { - ctx, cancel := context.WithCancel(ctx) - go func() { - scanner := bufio.NewScanner(stdout) - for { - select { - case <-ctx.Done(): - return - default: - } - if !scanner.Scan() { - if scanner.Err() != nil { - panic(scanner.Err()) - } - return - } - ent, err := p.NewEntry(scanner.Text()) - if err != nil { - ent = entry.New() - ent.Body = fmt.Sprintf("error: %s", err) - } - p.Write(ctx, ent) +type contextKey string - } - }() - return func(_ ExecutionSummary) { cancel() } -} +const ContextKeyCommandName = contextKey("commandName") diff --git a/pkg/receiver/jobreceiver/output/logentries/config.go b/pkg/receiver/jobreceiver/output/logentries/config.go index ab92d9a16d..6b6474e274 100644 --- a/pkg/receiver/jobreceiver/output/logentries/config.go +++ b/pkg/receiver/jobreceiver/output/logentries/config.go @@ -1,12 +1,22 @@ package logentries import ( + "bufio" + "fmt" + "io" + "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decoder" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" "go.uber.org/zap" ) +const ( + maxDefaultBufferSize = 32 * 1024 + defaultMaxLogSize = 1024 * 1024 +) + // LogEntriesConfig handles output as if it is a stream of distinct log events type LogEntriesConfig struct { // IncludeCommandName indicates to include the attribute `command.name` @@ -24,7 +34,24 @@ type LogEntriesConfig struct { } func (c *LogEntriesConfig) Build(logger *zap.SugaredLogger, op consumer.WriterOp) (consumer.Interface, error) { - return &consumer.DemoConsumer{WriterOp: op, Logger: logger}, nil + + encoding, err := decoder.LookupEncoding(c.Encoding) + if err != nil { + return nil, fmt.Errorf("log_entries configuration unable to use encoding %s: %w", c.Encoding, err) + } + splitFunc, err := c.Multiline.Build(encoding, true, true, true, nil, int(c.MaxLogSize)) + if err != nil { + return nil, fmt.Errorf("log_entries configuration could not build split function: %w", err) + } + return &handler{ + logger: logger, + writer: op, + config: *c, + scanFactory: scannerFactory{ + splitFunc: splitFunc, + maxLogSize: int(c.MaxLogSize), + }, + }, nil } type LogEntriesConfigFactory struct{} @@ -33,5 +60,38 @@ func (LogEntriesConfigFactory) CreateDefaultConfig() consumer.Builder { return &LogEntriesConfig{ IncludeCommandName: true, IncludeStreamName: true, + MaxLogSize: defaultMaxLogSize, + } +} + +type scannerFactory struct { + maxLogSize int + splitFunc bufio.SplitFunc +} + +func (f scannerFactory) Build(in io.Reader) *bufio.Scanner { + scanner := bufio.NewScanner(in) + + if f.maxLogSize <= 0 { + f.maxLogSize = defaultMaxLogSize + } + bufferSize := f.maxLogSize / 2 + if bufferSize > maxDefaultBufferSize { + bufferSize = maxDefaultBufferSize + } + scanner.Buffer(make([]byte, 0, bufferSize), f.maxLogSize) + scanner.Split(f.splitWithTruncate()) + return scanner +} + +func (f scannerFactory) splitWithTruncate() bufio.SplitFunc { + return func(data []byte, atEOF bool) (advance int, token []byte, err error) { + advance, token, err = f.splitFunc(data, atEOF) + if advance == 0 && token == nil && len(data) >= f.maxLogSize { + advance, token = f.maxLogSize, data[:f.maxLogSize] + } else if len(token) > f.maxLogSize { + advance, token = f.maxLogSize, data[:f.maxLogSize] + } + return } } diff --git a/pkg/receiver/jobreceiver/output/logentries/handler.go b/pkg/receiver/jobreceiver/output/logentries/handler.go new file mode 100644 index 0000000000..5b6a3fd6a5 --- /dev/null +++ b/pkg/receiver/jobreceiver/output/logentries/handler.go @@ -0,0 +1,55 @@ +package logentries + +import ( + "context" + "io" + + "github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer" + "go.uber.org/zap" +) + +const ( + streamNameStdout = "stdout" + streamNameStderr = "stderr" + commandNameLabel = "command.name" + commandStreamNameLabel = "command.stream.name" +) + +type handler struct { + logger *zap.SugaredLogger + writer consumer.WriterOp + + config LogEntriesConfig + scanFactory scannerFactory +} + +func (h *handler) Consume(ctx context.Context, stdout, stderr io.Reader) consumer.CloseFunc { + go h.consume(ctx, stdout, streamNameStdout) + go h.consume(ctx, stderr, streamNameStderr) + return nopCloser +} +func (h *handler) consume(ctx context.Context, in io.Reader, stream string) { + + scanner := h.scanFactory.Build(in) + for scanner.Scan() { + ent, err := h.writer.NewEntry(scanner.Text()) + if err != nil { + h.logger.Errorf("log entry handler could not create a new log entry: %s", err) + } + if ent.Attributes == nil { + ent.Attributes = map[string]interface{}{} + } + if h.config.IncludeCommandName { + ent.Attributes[commandNameLabel] = ctx.Value(consumer.ContextKeyCommandName) + } + if h.config.IncludeStreamName { + ent.Attributes[commandStreamNameLabel] = stream + } + h.writer.Write(ctx, ent) + } + if err := scanner.Err(); err != nil { + h.logger.Errorf("error reading input stream %s: %w", stream, err) + } +} + +func nopCloser(_ consumer.ExecutionSummary) {} diff --git a/pkg/receiver/jobreceiver/output/logentries/handler_test.go b/pkg/receiver/jobreceiver/output/logentries/handler_test.go new file mode 100644 index 0000000000..347a7e28a9 --- /dev/null +++ b/pkg/receiver/jobreceiver/output/logentries/handler_test.go @@ -0,0 +1,91 @@ +package logentries + +import ( + "context" + "fmt" + "io" + "strings" + "sync" + "testing" + "time" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestConsumeWithMaxLogSize(t *testing.T) { + var w stubWriter + cfg := LogEntriesConfig{ + IncludeCommandName: true, + IncludeStreamName: true, + MaxLogSize: 128, + } + h, err := cfg.Build(zap.NewNop().Sugar(), &w) + if err != nil { + t.Fatal(err) + } + + stdoutR, stdoutW := io.Pipe() + stderrR, stderrW := io.Pipe() + + writeStdout := func(w io.WriteCloser) { + fmt.Fprint(w, strings.Repeat("a", 64)) + fmt.Fprint(w, strings.Repeat("a", 64)) + for i := 0; i < 64; i = i + 8 { + fmt.Fprint(w, strings.Repeat("b", 8)) + } + fmt.Fprint(w, "\n") + w.Close() + } + + writeStderr := func(w io.WriteCloser) { + fmt.Fprint(w, "hello world") + w.Close() + } + h.Consume(context.Background(), stdoutR, stderrR) + + go writeStdout(stdoutW) + go writeStderr(stderrW) + + done := make(chan struct{}) + go func() { + close(done) + }() + + require.Eventually(t, + func() bool { + w.MU.Lock() + defer w.MU.Unlock() + return len(w.Out) == 3 + }, + time.Second, + time.Millisecond*100, + "expected three log entries out", + ) + for _, ent := range w.Out { + body, ok := ent.Body.(string) + require.True(t, ok) + assert.LessOrEqual(t, len(body), 128) + } +} + +type stubWriter struct { + MU sync.Mutex + Out []*entry.Entry +} + +func (s *stubWriter) NewEntry(value interface{}) (*entry.Entry, error) { + e := entry.New() + e.Attributes = make(map[string]interface{}) + e.Resource = make(map[string]interface{}) + e.Body = value + return e, nil +} + +func (s *stubWriter) Write(ctx context.Context, e *entry.Entry) { + s.MU.Lock() + s.Out = append(s.Out, e) + s.MU.Unlock() +} diff --git a/pkg/receiver/jobreceiver/runner.go b/pkg/receiver/jobreceiver/runner.go index 269afdae53..5a82ae661e 100644 --- a/pkg/receiver/jobreceiver/runner.go +++ b/pkg/receiver/jobreceiver/runner.go @@ -29,6 +29,7 @@ type stubRunner struct { func (r *stubRunner) Start(operator.Persister) error { go func() { ctx := context.Background() + ctx = context.WithValue(ctx, consumer.ContextKeyCommandName, r.Exec.Command) cmd := command.NewExecution(ctx, command.ExecutionRequest{ Command: r.Exec.Command, Arguments: r.Exec.Arguments, diff --git a/pkg/receiver/jobreceiver/testdata/usage.yaml b/pkg/receiver/jobreceiver/testdata/usage.yaml index 2f5e904a74..6e8f9748ea 100644 --- a/pkg/receiver/jobreceiver/testdata/usage.yaml +++ b/pkg/receiver/jobreceiver/testdata/usage.yaml @@ -4,9 +4,11 @@ receivers: schedule: interval: 1h exec: - command: echo + command: /bin/sh + timeout: 1m arguments: - - "hello world" + - '-c' + - 'for i in {0..120}; do echo -n "werning: " && date -Iseconds --date "90 seconds ago" && sleep 1 && echo "multi line $(head -c 12 /dev/urandom | base64)"; done;' output: type: log_entries log_entries: @@ -14,6 +16,8 @@ receivers: include_stream_name: true max_log_size: '16kb' encoding: 'utf-8' + multiline: + line_start_pattern: '\w+:.*' attributes: type: log resource: @@ -21,28 +25,30 @@ receivers: operators: - type: regex_parser parse_from: body - regex: '^(?P\w+).*$' + regex: '^(?P\w+):\s+(?P.*)' - type: severity_parser - parse_from: attributes.first + parse_from: attributes.level mapping: - error: total + error: errrrer warn: + - werning - nonsense - - '.' - - hello + - 'w.a.r.n.' + - type: time_parser + parse_from: attributes.ts + layout_type: gotime + layout: '2006-01-02T15:04:05-07:00' + monitoringjob/event: schedule: interval: 1h exec: - command: /bin/sh + command: uptime timeout: 1m - arguments: - - '-c' - - 'for i in {0..120}; do date -Iseconds --date "90 seconds ago" && sleep 1; done;' output: type: event event: - include_command_name: false + include_command_name: true include_command_status: true include_command_duration: true max_body_size: 1024 @@ -50,11 +56,6 @@ receivers: type: event resource: bingo: bango - operators: - - type: time_parser - parse_from: body - layout_type: gotime - layout: '2006-01-02T15:04:05-07:00' exporters: