diff --git a/pkg/observability/decoding.go b/pkg/observability/decoding.go index 8aaf6db..5339645 100644 --- a/pkg/observability/decoding.go +++ b/pkg/observability/decoding.go @@ -32,15 +32,17 @@ func Decode(r io.Reader) ([]FluentBitMessage, error) { fluentBitMessage.Date = val.(float64) } for key, value := range m2 { - switch valueTyped := value.(type) { - case string: - fluentBitMessage.Data[key] = valueTyped - case float64: - fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64) - case []byte: - fluentBitMessage.Data[key] = string(valueTyped) - default: - fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped)) + if key != "date" { + switch valueTyped := value.(type) { + case string: + fluentBitMessage.Data[key] = valueTyped + case float64: + fluentBitMessage.Data[key] = strconv.FormatFloat(valueTyped, 'f', -1, 64) + case []byte: + fluentBitMessage.Data[key] = string(valueTyped) + default: + fmt.Printf("no hit on type: %s", reflect.TypeOf(valueTyped)) + } } } result = append(result, fluentBitMessage) @@ -66,15 +68,15 @@ func decodeMessage(msgs []byte) []FluentBitMessage { isKey := false key := "" start := 8 + recordOffset - for kk := 8 + recordOffset; kk < k; kk++ { + for kk := start; kk < k; kk++ { if msgs[kk] == 0xff { if isKey { isKey = false - msg.Data[key] = string(msgs[recordOffset+start+1 : recordOffset+kk]) + msg.Data[key] = string(msgs[start+1 : kk]) start = kk + 1 } else { isKey = true - key = string(msgs[recordOffset+start : recordOffset+kk]) + key = string(msgs[start:kk]) start = kk } } diff --git a/pkg/observability/decoding_test.go b/pkg/observability/decoding_test.go index 6d0c108..f506488 100644 --- a/pkg/observability/decoding_test.go +++ b/pkg/observability/decoding_test.go @@ -2,13 +2,22 @@ package observability import ( "bytes" - "fmt" + "encoding/json" "testing" ) func TestDecoding(t *testing.T) { - data := `[{"date":1720613813.197045,"rand_value":"rand"}]` - messages, err := Decode(bytes.NewBuffer([]byte(data))) + payload := IncomingData{ + { + "date": 1720613813.197045, + "rand_value": "rand", + }, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + t.Fatalf("json marshal error: %s", err) + } + messages, err := Decode(bytes.NewBuffer(payloadBytes)) if err != nil { t.Fatalf("error: %s", err) } @@ -37,16 +46,15 @@ func TestDecodeMsg(t *testing.T) { "third key": "this is my third value", }, }, - /*{ + { Date: 1720613813.197099, Data: map[string]string{ "second data set": "my value", }, - },*/ + }, } encoded := encodeMessage(msgs) decoded := decodeMessage(encoded) - fmt.Printf("decoded: %+v\n", decoded) if len(msgs) != len(decoded) { t.Fatalf("length doesn't match") diff --git a/pkg/observability/handlers_test.go b/pkg/observability/handlers_test.go index 719386d..a46c64e 100644 --- a/pkg/observability/handlers_test.go +++ b/pkg/observability/handlers_test.go @@ -3,7 +3,6 @@ package observability import ( "bytes" "encoding/json" - "fmt" "net/http" "net/http/httptest" "testing" @@ -18,7 +17,7 @@ func TestIngestionHandler(t *testing.T) { } payload := IncomingData{ { - "Date": 1720613813.197045, + "date": 1720613813.197045, "log": "this is a string", }, } @@ -44,11 +43,18 @@ func TestIngestionHandler(t *testing.T) { if err != nil { t.Fatalf("read dir error: %s", err) } - for _, filename := range dirlist { - filenameOut, err := storage.ReadFile(filename) - if err != nil { - t.Fatalf("read file error: %s", err) - } - fmt.Printf("filenameOut: %s", filenameOut) + if len(dirlist) == 0 { + t.Fatalf("dir is empty") + } + messages, err := storage.ReadFile(dirlist[0]) + if err != nil { + t.Fatalf("read file error: %s", err) + } + decodedMessages := decodeMessage(messages) + if decodedMessages[0].Date != 1720613813.197045 { + t.Fatalf("unexpected date") + } + if decodedMessages[0].Data["log"] != "this is a string" { + t.Fatalf("unexpected log data") } }