From 88eeca5190ab6df16692240d1a4b1a4b7aa8c362 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Fri, 17 Jan 2025 12:25:05 +0530 Subject: [PATCH 1/2] update usage tracker with received bytes on stream level instead of each log line --- pkg/loghttp/push/otlp.go | 20 +++++++++----------- pkg/loghttp/push/push.go | 13 ++++++++----- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index 584b45a833b71..dbb4ec8349e63 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -185,6 +185,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten labelsStr := streamLabels.String() lbs := modelLabelsSetToLabelsList(streamLabels) + totalBytesReceived := int64(0) if _, ok := pushRequestsByStream[labelsStr]; !ok { pushRequestsByStream[labelsStr] = logproto.Stream{ @@ -197,9 +198,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten retentionPeriodForUser := tenantsRetention.RetentionPeriodFor(userID, lbs) stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(resourceAttributesAsStructuredMetadataSize) - if tracker != nil { - tracker.ReceivedBytesAdd(ctx, userID, retentionPeriodForUser, lbs, float64(resourceAttributesAsStructuredMetadataSize)) - } + totalBytesReceived += int64(resourceAttributesAsStructuredMetadataSize) stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], resourceAttributesAsStructuredMetadata...) @@ -252,9 +251,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten scopeAttributesAsStructuredMetadataSize := loki_util.StructuredMetadataSize(scopeAttributesAsStructuredMetadata) stats.StructuredMetadataBytes[retentionPeriodForUser] += int64(scopeAttributesAsStructuredMetadataSize) - if tracker != nil { - tracker.ReceivedBytesAdd(ctx, userID, retentionPeriodForUser, lbs, float64(scopeAttributesAsStructuredMetadataSize)) - } + totalBytesReceived += int64(scopeAttributesAsStructuredMetadataSize) stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser] = append(stats.ResourceAndSourceMetadataLabels[retentionPeriodForUser], scopeAttributesAsStructuredMetadata...) for k := 0; k < logs.Len(); k++ { @@ -279,17 +276,18 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, ten metadataSize := int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata) - resourceAttributesAsStructuredMetadataSize - scopeAttributesAsStructuredMetadataSize) stats.StructuredMetadataBytes[retentionPeriodForUser] += metadataSize stats.LogLinesBytes[retentionPeriodForUser] += int64(len(entry.Line)) - - if tracker != nil { - tracker.ReceivedBytesAdd(ctx, userID, retentionPeriodForUser, lbs, float64(len(entry.Line))) - tracker.ReceivedBytesAdd(ctx, userID, retentionPeriodForUser, lbs, float64(metadataSize)) - } + totalBytesReceived += metadataSize + totalBytesReceived += int64(len(entry.Line)) stats.NumLines++ if entry.Timestamp.After(stats.MostRecentEntryTimestamp) { stats.MostRecentEntryTimestamp = entry.Timestamp } } + + if tracker != nil { + tracker.ReceivedBytesAdd(ctx, userID, retentionPeriodForUser, lbs, float64(totalBytesReceived)) + } } } diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 9da5b29722643..b9466fa78e48a 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -281,22 +281,25 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe if tenantsRetention != nil { retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs) } + totalBytesReceived := int64(0) + for _, e := range s.Entries { pushStats.NumLines++ entryLabelsSize := int64(util.StructuredMetadataSize(e.StructuredMetadata)) pushStats.LogLinesBytes[retentionPeriod] += int64(len(e.Line)) pushStats.StructuredMetadataBytes[retentionPeriod] += entryLabelsSize - - if tracker != nil { - tracker.ReceivedBytesAdd(r.Context(), userID, retentionPeriod, lbs, float64(len(e.Line))) - tracker.ReceivedBytesAdd(r.Context(), userID, retentionPeriod, lbs, float64(entryLabelsSize)) - } + totalBytesReceived += int64(len(e.Line)) + totalBytesReceived += entryLabelsSize if e.Timestamp.After(pushStats.MostRecentEntryTimestamp) { pushStats.MostRecentEntryTimestamp = e.Timestamp } } + if tracker != nil { + tracker.ReceivedBytesAdd(r.Context(), userID, retentionPeriod, lbs, float64(totalBytesReceived)) + } + req.Streams[i] = s } From cf30684ab3904d18ef0496ee42b5009dfff2e5c9 Mon Sep 17 00:00:00 2001 From: Sandeep Sukhani Date: Fri, 17 Jan 2025 12:38:40 +0530 Subject: [PATCH 2/2] lint --- pkg/loghttp/push/push.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index b9466fa78e48a..759e21f293ede 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -282,7 +282,7 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs) } totalBytesReceived := int64(0) - + for _, e := range s.Entries { pushStats.NumLines++ entryLabelsSize := int64(util.StructuredMetadataSize(e.StructuredMetadata))