Skip to content

Commit

Permalink
fix(logqlengine): adapt to OTEL
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Dec 1, 2023
1 parent e1f81a2 commit 9948616
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 22 deletions.
103 changes: 86 additions & 17 deletions internal/logql/logqlengine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"github.com/go-faster/jx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -56,13 +57,88 @@ func (m *mockQuerier) SelectLogs(_ context.Context, start, _ otelstorage.Timesta

for _, l := range m.lines {
ts = ts.Add(step)
records = append(records, logstorage.Record{
body := l.line
rec := logstorage.Record{
Timestamp: otelstorage.NewTimestampFromTime(ts),
Body: l.line,
Attrs: otelstorage.Attrs(l.attrs),
ScopeAttrs: otelstorage.Attrs(scopeAttrs),
ResourceAttrs: otelstorage.Attrs(resAttrs),
})
}
if rec.Attrs == otelstorage.Attrs(pcommon.Map{}) {
rec.Attrs = otelstorage.Attrs(pcommon.NewMap())
}
if dec := jx.DecodeStr(body); dec.Next() == jx.Object {
rec.Body = ""
if err := dec.Obj(func(d *jx.Decoder, key string) error {
switch key {
case logstorage.LabelBody:
v, err := d.Str()
if err != nil {
return err
}
rec.Body = v
return nil
case logstorage.LabelTraceID:
v, err := d.Str()
if err != nil {
return err
}
traceID, err := otelstorage.ParseTraceID(v)
if err != nil {
return err
}
rec.TraceID = traceID
return nil
default:
switch d.Next() {
case jx.String:
v, err := d.Str()
if err != nil {
return err
}
rec.Attrs.AsMap().PutStr(key, v)
return nil
case jx.Bool:
v, err := d.Bool()
if err != nil {
return err
}
rec.Attrs.AsMap().PutBool(key, v)
return nil
case jx.Number:
v, err := d.Num()
if err != nil {
return err
}
if v.IsInt() {
n, err := v.Int64()
if err != nil {
return err
}
rec.Attrs.AsMap().PutInt(key, n)
} else {
n, err := v.Float64()
if err != nil {
return err
}
rec.Attrs.AsMap().PutDouble(key, n)
}
return nil
default:
v, err := d.Raw()
if err != nil {
return err
}
rec.Attrs.AsMap().PutStr(key, string(v))
return nil
}
}
}); err != nil {
return nil, err
}
}
records = append(records, rec)
}

return iterators.Slice(records), nil
Expand Down Expand Up @@ -128,20 +204,7 @@ func TestEngineEvalStream(t *testing.T) {
{
`{resource="test"}`,
inputLines,
[]resultLine{
{
`{"id": 1, "foo": "4m", "bar": "1s", "baz": "1kb"}`,
map[string]string{},
},
{
`{"id": 2, "foo": "5m", "bar": "2s", "baz": "1mb"}`,
map[string]string{},
},
{
`{"id": 3, "foo": "6m", "bar": "3s", "baz": "1gb"}`,
map[string]string{},
},
},
resultLines,
false,
},
{
Expand Down Expand Up @@ -428,7 +491,13 @@ func TestEngineEvalStream(t *testing.T) {
line: e.line,
labels: e.labels,
}
assert.Equal(t, result, tt.wantData[i])
wanna := tt.wantData[i]
if jx.Valid([]byte(wanna.line)) {
assert.JSONEq(t, wanna.line, result.line)
} else {
assert.Equal(t, wanna.line, result.line)
}
assert.Equal(t, wanna.labels, result.labels)
}
})
}
Expand Down
4 changes: 3 additions & 1 deletion internal/logql/logqlengine/label_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ func (l *LabelSet) SetFromRecord(record logstorage.Record) {
if severity := record.SeverityNumber; severity != plog.SeverityNumberUnspecified {
l.Set(logstorage.LabelSeverity, pcommon.NewValueStr(severity.String()))
}
l.Set(logstorage.LabelBody, pcommon.NewValueStr(record.Body))
if body := record.Body; body != "" {
l.Set(logstorage.LabelBody, pcommon.NewValueStr(body))
}
l.SetAttrs(record.Attrs, record.ScopeAttrs, record.ResourceAttrs)
}

Expand Down
9 changes: 5 additions & 4 deletions internal/logql/logqlengine/otel_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ func LineFromRecord(record logstorage.Record) string {
// Create JSON object from record.
e := &jx.Encoder{}
e.Obj(func(e *jx.Encoder) {
e.Field(logstorage.LabelBody, func(e *jx.Encoder) {
e.Str(record.Body)
})

if len(record.Body) != 0 {
e.Field(logstorage.LabelBody, func(e *jx.Encoder) {
e.Str(record.Body)
})
}
if m := record.Attrs.AsMap(); m != (pcommon.Map{}) {
record.Attrs.AsMap().Range(func(k string, v pcommon.Value) bool {
e.Field(k, func(e *jx.Encoder) {
Expand Down

0 comments on commit 9948616

Please sign in to comment.