Skip to content

Commit

Permalink
[pkg/reciever/jobreceiver] Implement logentries output handler
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Kruse <[email protected]>
  • Loading branch information
c-kruse committed Sep 22, 2023
1 parent 885bda4 commit 055976b
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 54 deletions.
38 changes: 2 additions & 36 deletions pkg/receiver/jobreceiver/output/consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package consumer

import (
"bufio"
"context"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -37,38 +35,6 @@ type WriterOp interface {
Write(ctx context.Context, e *entry.Entry)
}

// DemoConsumer stub consumer implementation.
// todo(ck) delete - this is a stub implementation for PoC purposes only.
type DemoConsumer struct {
WriterOp
Logger *zap.SugaredLogger
}

// Consume reads stdout line by line and produces entries
func (p *DemoConsumer) Consume(ctx context.Context, stdout, stderr io.Reader) CloseFunc {
ctx, cancel := context.WithCancel(ctx)
go func() {
scanner := bufio.NewScanner(stdout)
for {
select {
case <-ctx.Done():
return
default:
}
if !scanner.Scan() {
if scanner.Err() != nil {
panic(scanner.Err())
}
return
}
ent, err := p.NewEntry(scanner.Text())
if err != nil {
ent = entry.New()
ent.Body = fmt.Sprintf("error: %s", err)
}
p.Write(ctx, ent)
type contextKey string

}
}()
return func(_ ExecutionSummary) { cancel() }
}
const ContextKeyCommandName = contextKey("commandName")
62 changes: 61 additions & 1 deletion pkg/receiver/jobreceiver/output/logentries/config.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package logentries

import (
"bufio"
"fmt"
"io"

"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decoder"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split"
"go.uber.org/zap"
)

const (
maxDefaultBufferSize = 32 * 1024
defaultMaxLogSize = 1024 * 1024
)

// LogEntriesConfig handles output as if it is a stream of distinct log events
type LogEntriesConfig struct {
// IncludeCommandName indicates to include the attribute `command.name`
Expand All @@ -24,7 +34,24 @@ type LogEntriesConfig struct {
}

func (c *LogEntriesConfig) Build(logger *zap.SugaredLogger, op consumer.WriterOp) (consumer.Interface, error) {
return &consumer.DemoConsumer{WriterOp: op, Logger: logger}, nil

encoding, err := decoder.LookupEncoding(c.Encoding)
if err != nil {
return nil, fmt.Errorf("log_entries configuration unable to use encoding %s: %w", c.Encoding, err)
}
splitFunc, err := c.Multiline.Build(encoding, true, true, true, nil, int(c.MaxLogSize))
if err != nil {
return nil, fmt.Errorf("log_entries configuration could not build split function: %w", err)
}
return &handler{
logger: logger,
writer: op,
config: *c,
scanFactory: scannerFactory{
splitFunc: splitFunc,
maxLogSize: int(c.MaxLogSize),
},
}, nil
}

type LogEntriesConfigFactory struct{}
Expand All @@ -33,5 +60,38 @@ func (LogEntriesConfigFactory) CreateDefaultConfig() consumer.Builder {
return &LogEntriesConfig{
IncludeCommandName: true,
IncludeStreamName: true,
MaxLogSize: defaultMaxLogSize,
}
}

type scannerFactory struct {
maxLogSize int
splitFunc bufio.SplitFunc
}

func (f scannerFactory) Build(in io.Reader) *bufio.Scanner {
scanner := bufio.NewScanner(in)

if f.maxLogSize <= 0 {
f.maxLogSize = defaultMaxLogSize
}
bufferSize := f.maxLogSize / 2
if bufferSize > maxDefaultBufferSize {
bufferSize = maxDefaultBufferSize
}
scanner.Buffer(make([]byte, 0, bufferSize), f.maxLogSize)
scanner.Split(f.splitWithTruncate())
return scanner
}

func (f scannerFactory) splitWithTruncate() bufio.SplitFunc {
return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
advance, token, err = f.splitFunc(data, atEOF)
if advance == 0 && token == nil && len(data) >= f.maxLogSize {
advance, token = f.maxLogSize, data[:f.maxLogSize]
} else if len(token) > f.maxLogSize {
advance, token = f.maxLogSize, data[:f.maxLogSize]
}
return
}
}
55 changes: 55 additions & 0 deletions pkg/receiver/jobreceiver/output/logentries/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package logentries

import (
"context"
"io"

"github.com/SumoLogic/sumologic-otel-collector/pkg/receiver/jobreceiver/output/consumer"
"go.uber.org/zap"
)

const (
streamNameStdout = "stdout"
streamNameStderr = "stderr"
commandNameLabel = "command.name"
commandStreamNameLabel = "command.stream.name"
)

type handler struct {
logger *zap.SugaredLogger
writer consumer.WriterOp

config LogEntriesConfig
scanFactory scannerFactory
}

func (h *handler) Consume(ctx context.Context, stdout, stderr io.Reader) consumer.CloseFunc {
go h.consume(ctx, stdout, streamNameStdout)
go h.consume(ctx, stderr, streamNameStderr)
return nopCloser
}
func (h *handler) consume(ctx context.Context, in io.Reader, stream string) {

scanner := h.scanFactory.Build(in)
for scanner.Scan() {
ent, err := h.writer.NewEntry(scanner.Text())
if err != nil {
h.logger.Errorf("log entry handler could not create a new log entry: %s", err)
}
if ent.Attributes == nil {
ent.Attributes = map[string]interface{}{}
}
if h.config.IncludeCommandName {
ent.Attributes[commandNameLabel] = ctx.Value(consumer.ContextKeyCommandName)
}
if h.config.IncludeStreamName {
ent.Attributes[commandStreamNameLabel] = stream
}
h.writer.Write(ctx, ent)
}
if err := scanner.Err(); err != nil {
h.logger.Errorf("error reading input stream %s: %w", stream, err)
}
}

func nopCloser(_ consumer.ExecutionSummary) {}
91 changes: 91 additions & 0 deletions pkg/receiver/jobreceiver/output/logentries/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package logentries

import (
"context"
"fmt"
"io"
"strings"
"sync"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestConsumeWithMaxLogSize(t *testing.T) {
var w stubWriter
cfg := LogEntriesConfig{
IncludeCommandName: true,
IncludeStreamName: true,
MaxLogSize: 128,
}
h, err := cfg.Build(zap.NewNop().Sugar(), &w)
if err != nil {
t.Fatal(err)
}

stdoutR, stdoutW := io.Pipe()
stderrR, stderrW := io.Pipe()

writeStdout := func(w io.WriteCloser) {
fmt.Fprint(w, strings.Repeat("a", 64))
fmt.Fprint(w, strings.Repeat("a", 64))
for i := 0; i < 64; i = i + 8 {
fmt.Fprint(w, strings.Repeat("b", 8))
}
fmt.Fprint(w, "\n")
w.Close()
}

writeStderr := func(w io.WriteCloser) {
fmt.Fprint(w, "hello world")
w.Close()
}
h.Consume(context.Background(), stdoutR, stderrR)

go writeStdout(stdoutW)
go writeStderr(stderrW)

done := make(chan struct{})
go func() {
close(done)
}()

require.Eventually(t,
func() bool {
w.MU.Lock()
defer w.MU.Unlock()
return len(w.Out) == 3
},
time.Second,
time.Millisecond*100,
"expected three log entries out",
)
for _, ent := range w.Out {
body, ok := ent.Body.(string)
require.True(t, ok)
assert.LessOrEqual(t, len(body), 128)
}
}

type stubWriter struct {
MU sync.Mutex
Out []*entry.Entry
}

func (s *stubWriter) NewEntry(value interface{}) (*entry.Entry, error) {
e := entry.New()
e.Attributes = make(map[string]interface{})
e.Resource = make(map[string]interface{})
e.Body = value
return e, nil
}

func (s *stubWriter) Write(ctx context.Context, e *entry.Entry) {
s.MU.Lock()
s.Out = append(s.Out, e)
s.MU.Unlock()
}
1 change: 1 addition & 0 deletions pkg/receiver/jobreceiver/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type stubRunner struct {
func (r *stubRunner) Start(operator.Persister) error {
go func() {
ctx := context.Background()
ctx = context.WithValue(ctx, consumer.ContextKeyCommandName, r.Exec.Command)
cmd := command.NewExecution(ctx, command.ExecutionRequest{
Command: r.Exec.Command,
Arguments: r.Exec.Arguments,
Expand Down
35 changes: 18 additions & 17 deletions pkg/receiver/jobreceiver/testdata/usage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,57 +4,58 @@ receivers:
schedule:
interval: 1h
exec:
command: echo
command: /bin/sh
timeout: 1m
arguments:
- "hello world"
- '-c'
- 'for i in {0..120}; do echo -n "werning: " && date -Iseconds --date "90 seconds ago" && sleep 1 && echo "multi line $(head -c 12 /dev/urandom | base64)"; done;'
output:
type: log_entries
log_entries:
include_command_name: true
include_stream_name: true
max_log_size: '16kb'
encoding: 'utf-8'
multiline:
line_start_pattern: '\w+:.*'
attributes:
type: log
resource:
bingo: bango
operators:
- type: regex_parser
parse_from: body
regex: '^(?P<first>\w+).*$'
regex: '^(?P<level>\w+):\s+(?P<ts>.*)'
- type: severity_parser
parse_from: attributes.first
parse_from: attributes.level
mapping:
error: total
error: errrrer
warn:
- werning
- nonsense
- '.'
- hello
- 'w.a.r.n.'
- type: time_parser
parse_from: attributes.ts
layout_type: gotime
layout: '2006-01-02T15:04:05-07:00'

monitoringjob/event:
schedule:
interval: 1h
exec:
command: /bin/sh
command: uptime
timeout: 1m
arguments:
- '-c'
- 'for i in {0..120}; do date -Iseconds --date "90 seconds ago" && sleep 1; done;'
output:
type: event
event:
include_command_name: false
include_command_name: true
include_command_status: true
include_command_duration: true
max_body_size: 1024
attributes:
type: event
resource:
bingo: bango
operators:
- type: time_parser
parse_from: body
layout_type: gotime
layout: '2006-01-02T15:04:05-07:00'


exporters:
Expand Down

0 comments on commit 055976b

Please sign in to comment.