Skip to content

Commit

Permalink
decoding fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
wardviaene committed Sep 13, 2024
1 parent 7de0b71 commit 25c1690
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 26 deletions.
26 changes: 14 additions & 12 deletions pkg/observability/decoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ func Decode(r io.Reader) ([]FluentBitMessage, error) {
fluentBitMessage.Date = val.(float64)
}
for key, value := range m2 {
switch valueTyped := value.(type) {
case string:
fluentBitMessage.Data[key] = valueTyped
case float64:
fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64)
case []byte:
fluentBitMessage.Data[key] = string(valueTyped)
default:
fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped))
if key != "date" {
switch valueTyped := value.(type) {
case string:
fluentBitMessage.Data[key] = valueTyped
case float64:
fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64)
case []byte:
fluentBitMessage.Data[key] = string(valueTyped)
default:
fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped))
}
}
}
result = append(result, fluentBitMessage)
Expand All @@ -66,15 +68,15 @@ func decodeMessage(msgs []byte) []FluentBitMessage {
isKey := false
key := ""
start := 8 + recordOffset
for kk := 8 + recordOffset; kk < k; kk++ {
for kk := start; kk < k; kk++ {
if msgs[kk] == 0xff {
if isKey {
isKey = false
msg.Data[key] = string(msgs[recordOffset+start+1 : recordOffset+kk])
msg.Data[key] = string(msgs[start+1 : kk])
start = kk + 1
} else {
isKey = true
key = string(msgs[recordOffset+start : recordOffset+kk])
key = string(msgs[start:kk])
start = kk
}
}
Expand Down
20 changes: 14 additions & 6 deletions pkg/observability/decoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,22 @@ package observability

import (
"bytes"
"fmt"
"encoding/json"
"testing"
)

func TestDecoding(t *testing.T) {
data := `[{"date":1720613813.197045,"rand_value":"rand"}]`
messages, err := Decode(bytes.NewBuffer([]byte(data)))
payload := IncomingData{
{
"date": 1720613813.197045,
"rand_value": "rand",
},
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
t.Fatalf("json marshal error: %s", err)
}
messages, err := Decode(bytes.NewBuffer(payloadBytes))
if err != nil {
t.Fatalf("error: %s", err)
}
Expand Down Expand Up @@ -37,16 +46,15 @@ func TestDecodeMsg(t *testing.T) {
"third key": "this is my third value",
},
},
/*{
{
Date: 1720613813.197099,
Data: map[string]string{
"second data set": "my value",
},
},*/
},
}
encoded := encodeMessage(msgs)
decoded := decodeMessage(encoded)
fmt.Printf("decoded: %+v\n", decoded)

if len(msgs) != len(decoded) {
t.Fatalf("length doesn't match")
Expand Down
22 changes: 14 additions & 8 deletions pkg/observability/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package observability
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -18,7 +17,7 @@ func TestIngestionHandler(t *testing.T) {
}
payload := IncomingData{
{
"Date": 1720613813.197045,
"date": 1720613813.197045,
"log": "this is a string",
},
}
Expand All @@ -44,11 +43,18 @@ func TestIngestionHandler(t *testing.T) {
if err != nil {
t.Fatalf("read dir error: %s", err)
}
for _, filename := range dirlist {
filenameOut, err := storage.ReadFile(filename)
if err != nil {
t.Fatalf("read file error: %s", err)
}
fmt.Printf("filenameOut: %s", filenameOut)
if len(dirlist) == 0 {
t.Fatalf("dir is empty")
}
messages, err := storage.ReadFile(dirlist[0])
if err != nil {
t.Fatalf("read file error: %s", err)
}
decodedMessages := decodeMessage(messages)
if decodedMessages[0].Date != 1720613813.197045 {
t.Fatalf("unexpected date")
}
if decodedMessages[0].Data["log"] != "this is a string" {
t.Fatalf("unexpected log data")
}
}

0 comments on commit 25c1690

Please sign in to comment.