From 1c2ec56a6a135dde910e09fdae8024dd66c9eb2a Mon Sep 17 00:00:00 2001 From: andriikushch Date: Wed, 30 Oct 2024 12:29:26 +0100 Subject: [PATCH] fix: parser for azureeventhubs message without time field (#1990) * fix: parser for azureeventhubs message without time field * update CHANGELOG.md * Update CHANGELOG.md Co-authored-by: William Dumont * update CHANGELOG.md * update link to the corresponded latest change in the Loki project --------- Co-authored-by: William Dumont --- CHANGELOG.md | 2 + .../internal/parser/parser.go | 40 ++++++++++++++++--- .../internal/parser/parser_test.go | 34 ++++++++++++++++ .../message_without_time_and_time_stamp.json | 9 +++++ .../message_without_time_with_time_stamp.json | 10 +++++ 5 files changed, 90 insertions(+), 5 deletions(-) create mode 100644 internal/component/loki/source/azure_event_hubs/internal/parser/testdata/message_without_time_and_time_stamp.json create mode 100644 internal/component/loki/source/azure_event_hubs/internal/parser/testdata/message_without_time_with_time_stamp.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f05479234..8ee417f07f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,8 @@ Main (unreleased) - Add Prometheus bearer authentication to a `prometheus.write.queue` component (@freak12techno) +- Support logs that have a `timestamp` field instead of a `time` field for the `loki.source.azure_event_hubs` component. (@andriikushch) + - Add `proxy_url` to `otelcol.exporter.otlphttp`. (@wildum) ### Bugfixes diff --git a/internal/component/loki/source/azure_event_hubs/internal/parser/parser.go b/internal/component/loki/source/azure_event_hubs/internal/parser/parser.go index 508745a9e7..8b3ec7f606 100644 --- a/internal/component/loki/source/azure_event_hubs/internal/parser/parser.go +++ b/internal/component/loki/source/azure_event_hubs/internal/parser/parser.go @@ -1,6 +1,6 @@ package parser -// This code is copied from Promtail (https://github.com/grafana/loki/commit/065bee7e72b00d800431f4b70f0d673d6e0e7a2b). The parser package is used to +// This code is copied from Promtail (https://github.com/grafana/loki/commit/2e62abbf47c47041027baf240722b3d76e7bd9a3). The parser package is used to // enable parsing entries from Azure Event Hubs entries and forward them // to other loki components. @@ -36,7 +36,9 @@ func (l azureMonitorResourceLogs) validate() error { // azureMonitorResourceLog used to unmarshal common schema for Azure resource logs // https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/resource-logs-schema type azureMonitorResourceLog struct { - Time string `json:"time"` + Time string `json:"time"` + // Some logs have `time` field, some have `timeStamp` field : https://github.com/grafana/loki/issues/14176 + TimeStamp string `json:"timeStamp"` Category string `json:"category"` ResourceID string `json:"resourceId"` OperationName string `json:"operationName"` @@ -44,7 +46,7 @@ type azureMonitorResourceLog struct { // validate check if fields marked as required by schema for Azure resource log are not empty func (l azureMonitorResourceLog) validate() error { - valid := len(l.Time) != 0 && + valid := l.isTimeOrTimeStampFieldSet() && len(l.Category) != 0 && len(l.ResourceID) != 0 && len(l.OperationName) != 0 @@ -56,6 +58,34 @@ func (l azureMonitorResourceLog) validate() error { return nil } +func (l azureMonitorResourceLog) isTimeOrTimeStampFieldSet() bool { + return len(l.Time) != 0 || len(l.TimeStamp) != 0 +} + +// getTime returns time from `time` or `timeStamp` field. If both fields are set, `time` is used. If both fields are empty, error is returned. +func (l azureMonitorResourceLog) getTime() (time.Time, error) { + if len(l.Time) == 0 && len(l.TimeStamp) == 0 { + var t time.Time + return t, errors.New("time and timeStamp fields are empty") + } + + if len(l.Time) != 0 { + t, err := time.Parse(time.RFC3339, l.Time) + if err != nil { + return t, err + } + + return t.UTC(), nil + } + + t, err := time.Parse(time.RFC3339, l.TimeStamp) + if err != nil { + return t, err + } + + return t.UTC(), nil +} + type AzureEventHubsTargetMessageParser struct { DisallowCustomMessages bool } @@ -156,11 +186,11 @@ func (e *AzureEventHubsTargetMessageParser) parseRecord(record []byte, labelSet } func (e *AzureEventHubsTargetMessageParser) getTime(messageTime time.Time, useIncomingTimestamp bool, logRecord *azureMonitorResourceLog) time.Time { - if !useIncomingTimestamp || logRecord.Time == "" { + if !useIncomingTimestamp || !logRecord.isTimeOrTimeStampFieldSet() { return messageTime } - recordTime, err := time.Parse(time.RFC3339, logRecord.Time) + recordTime, err := logRecord.getTime() if err != nil { return messageTime } diff --git a/internal/component/loki/source/azure_event_hubs/internal/parser/parser_test.go b/internal/component/loki/source/azure_event_hubs/internal/parser/parser_test.go index 3ade117008..be064ca3f5 100644 --- a/internal/component/loki/source/azure_event_hubs/internal/parser/parser_test.go +++ b/internal/component/loki/source/azure_event_hubs/internal/parser/parser_test.go @@ -249,6 +249,40 @@ func Test_parseMessage_custom_message_and_logic_app_logs_disallowCustomMessages( assert.Error(t, err) } +func Test_parseMessage_message_without_time_with_time_stamp(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{ + DisallowCustomMessages: true, + } + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/message_without_time_with_time_stamp.json"), + Timestamp: time.Date(2023, time.March, 17, 8, 44, 02, 0, time.UTC), + } + + entries, err := messageParser.Parse(message, nil, nil, true) + assert.NoError(t, err) + assert.Len(t, entries, 1) + + expectedLine1 := "{\n \"timeStamp\": \"2024-09-18T00:45:09+00:00\",\n \"resourceId\": \"/RESOURCE_ID\",\n \"operationName\": \"ApplicationGatewayAccess\",\n \"category\": \"ApplicationGatewayAccessLog\"\n }" + assert.Equal(t, expectedLine1, entries[0].Line) + + assert.Equal(t, time.Date(2024, time.September, 18, 00, 45, 9, 0, time.UTC), entries[0].Timestamp) +} + +func Test_parseMessage_message_without_time_and_time_stamp(t *testing.T) { + messageParser := &AzureEventHubsTargetMessageParser{ + DisallowCustomMessages: true, + } + + message := &sarama.ConsumerMessage{ + Value: readFile(t, "testdata/message_without_time_and_time_stamp.json"), + Timestamp: time.Date(2023, time.March, 17, 8, 44, 02, 0, time.UTC), + } + + _, err := messageParser.Parse(message, nil, nil, true) + assert.EqualError(t, err, "required field or fields is empty") +} + func readFile(t *testing.T, filename string) []byte { data, err := os.ReadFile(filename) assert.NoError(t, err) diff --git a/internal/component/loki/source/azure_event_hubs/internal/parser/testdata/message_without_time_and_time_stamp.json b/internal/component/loki/source/azure_event_hubs/internal/parser/testdata/message_without_time_and_time_stamp.json new file mode 100644 index 0000000000..f9fc41ad02 --- /dev/null +++ b/internal/component/loki/source/azure_event_hubs/internal/parser/testdata/message_without_time_and_time_stamp.json @@ -0,0 +1,9 @@ +{ + "records": [ + { + "resourceId": "/RESOURCE_ID", + "operationName": "ApplicationGatewayAccess", + "category": "ApplicationGatewayAccessLog" + } + ] +} \ No newline at end of file diff --git a/internal/component/loki/source/azure_event_hubs/internal/parser/testdata/message_without_time_with_time_stamp.json b/internal/component/loki/source/azure_event_hubs/internal/parser/testdata/message_without_time_with_time_stamp.json new file mode 100644 index 0000000000..8579fc4897 --- /dev/null +++ b/internal/component/loki/source/azure_event_hubs/internal/parser/testdata/message_without_time_with_time_stamp.json @@ -0,0 +1,10 @@ +{ + "records": [ + { + "timeStamp": "2024-09-18T00:45:09+00:00", + "resourceId": "/RESOURCE_ID", + "operationName": "ApplicationGatewayAccess", + "category": "ApplicationGatewayAccessLog" + } + ] +} \ No newline at end of file