Skip to content

Commit

Permalink
Added request level metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
sourabh1007 committed Sep 13, 2024
1 parent 0fdb5e4 commit c47f59b
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 54 deletions.
1 change: 1 addition & 0 deletions Microsoft.Azure.Cosmos/src/DocumentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace Microsoft.Azure.Cosmos
using global::Azure.Core;
using Microsoft.Azure.Cosmos.Common;
using Microsoft.Azure.Cosmos.Core.Trace;
using Microsoft.Azure.Cosmos.Handler;
using Microsoft.Azure.Cosmos.Query;
using Microsoft.Azure.Cosmos.Query.Core.QueryPlan;
using Microsoft.Azure.Cosmos.Routing;
Expand Down
10 changes: 5 additions & 5 deletions Microsoft.Azure.Cosmos/src/Handler/DiagnosticsHandlerHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ namespace Microsoft.Azure.Cosmos.Handler
/// </summary>
internal class DiagnosticsHandlerHelper
{
private const string Diagnostickey = "diagnostic";
private const string Telemetrykey = "telemetry";
private const string DiagnosticKey = "diagnostic";
private const string TelemetryKey = "telemetry";

public static readonly TimeSpan DiagnosticsRefreshInterval = TimeSpan.FromSeconds(10);
private static readonly TimeSpan ClientTelemetryRefreshInterval = TimeSpan.FromSeconds(5);

// Need to reset it in Tests hence kept it non-readonly.
private static SystemUsageRecorder DiagnosticSystemUsageRecorder = new SystemUsageRecorder(
identifier: Diagnostickey,
private static readonly SystemUsageRecorder DiagnosticSystemUsageRecorder = new SystemUsageRecorder(
identifier: DiagnosticKey,
historyLength: 6,
refreshInterval: DiagnosticsHandlerHelper.DiagnosticsRefreshInterval);
private static SystemUsageRecorder TelemetrySystemUsageRecorder = null;
Expand Down Expand Up @@ -103,7 +103,7 @@ private DiagnosticsHandlerHelper()
{
// re-initialize a fresh telemetry recorder when feature is switched on
DiagnosticsHandlerHelper.TelemetrySystemUsageRecorder = new SystemUsageRecorder(
identifier: Telemetrykey,
identifier: TelemetryKey,
historyLength: 120,
refreshInterval: DiagnosticsHandlerHelper.ClientTelemetryRefreshInterval);

Expand Down
57 changes: 28 additions & 29 deletions Microsoft.Azure.Cosmos/src/Handler/TelemetryHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.Handlers
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -26,38 +27,36 @@ public override async Task<ResponseMessage> SendAsync(
CancellationToken cancellationToken)
{
ResponseMessage response = await base.SendAsync(request, cancellationToken);
if (this.IsAllowed(request))
try
{
try
{
this.telemetryToServiceHelper.GetCollector().CollectOperationAndNetworkInfo(
() => new TelemetryInformation
{
RegionsContactedList = response.Diagnostics.GetContactedRegions(),
RequestLatency = response.Diagnostics.GetClientElapsedTime(),
StatusCode = response.StatusCode,
ResponseSizeInBytes = TelemetryHandler.GetPayloadSize(response),
ContainerId = request.ContainerId,
DatabaseId = request.DatabaseId,
OperationType = request.OperationType,
ResourceType = request.ResourceType,
ConsistencyLevel = request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel],
RequestCharge = response.Headers.RequestCharge,
SubStatusCode = response.Headers.SubStatusCode,
Trace = response.Trace
});
}
catch (Exception ex)
{
DefaultTrace.TraceError("Error while collecting telemetry information : {0}", ex);
}
this.telemetryToServiceHelper
.GetCollectors(request)
.ForEach((collector) => collector.CollectOperationAndNetworkInfo(
() => new TelemetryInformation
{
RegionsContactedList = response.Diagnostics.GetContactedRegions(),
RequestLatency = response.Diagnostics.GetClientElapsedTime(),
StatusCode = response.StatusCode,
ResponseSizeInBytes = TelemetryHandler.GetPayloadSize(response),
ContainerId = request.ContainerId,
DatabaseId = request.DatabaseId,
OperationType = request.OperationType,
ResourceType = request.ResourceType,
ConsistencyLevel = request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel],
RequestCharge = response.Headers.RequestCharge,
SubStatusCode = response.Headers.SubStatusCode,
Trace = response.Trace,
MaxItemCount = request.Headers.PageSize,
ActualItemCount = response.Headers.ItemCount,
PartitionKeyRangeId = request.Headers.PartitionKeyRangeId
}));
}
catch (Exception ex)
{
DefaultTrace.TraceError("Error while collecting telemetry information : {0}", ex);
}
return response;
}

private bool IsAllowed(RequestMessage request)
{
return ClientTelemetryOptions.AllowedResourceTypes.Equals(request.ResourceType);
return response;
}

/// <summary>
Expand Down
4 changes: 2 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/ClientCollectionCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ await this.storeModel.ProcessMessageAsync(request))
{
ContainerProperties containerProperties = CosmosResource.FromStream<ContainerProperties>(response);

this.telemetryToServiceHelper.GetCollector().CollectCacheInfo(
this.telemetryToServiceHelper.GetCollectors().ForEach((collector) => collector.CollectCacheInfo(
ClientCollectionCache.TelemetrySourceName,
() => new TelemetryInformation
{
Expand All @@ -227,7 +227,7 @@ await this.storeModel.ProcessMessageAsync(request))
ResourceType = request.ResourceType,
SubStatusCode = response.SubStatusCode,
CollectionLink = collectionLink
});
}));

return containerProperties;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//------------------------------------------------------------

namespace Microsoft.Azure.Cosmos.Telemetry
{
using System;
using System.Collections.Generic;
using Microsoft.Azure.Cosmos.Telemetry.Collector;

/// <summary>
/// The OpenTelemetryMetricsCollector class is responsible for collecting and recording Cosmos DB operational metrics, such as item counts, request latency, request units, and regions contacted.
/// This data is captured using the OpenTelemetry metrics API, which allows tracking and analysis of Cosmos DB operations at a granular level.
/// </summary>
internal class OpenTelemetryMetricsCollector : ITelemetryCollector
{
private readonly string clientId;
private readonly string accountName;

/// <summary>
/// Initializes a new instance of the OpenTelemetryMetricsCollector class.
/// </summary>
/// <param name="clientId">A unique identifier for the Cosmos DB client instance</param>
/// <param name="accountName">The Cosmos DB account name.</param>
public OpenTelemetryMetricsCollector(string clientId, string accountName)
{
this.clientId = clientId;
this.accountName = accountName;
}

public void CollectCacheInfo(string cacheName, Func<TelemetryInformation> getTelemetryInformation)
{
// No OP
}

/// <summary>
/// Collects telemetry data related to operations and network information, including request performance, item counts, and regions contacted.
/// </summary>
/// <param name="getTelemetryInformation"> A function that provides telemetry details such as operation type, status code, consistency level, and request charge.</param>
/// <remarks>This method gathers telemetry information, including details such as the database, container, operation type, status code, consistency level, and partition key range ID. It uses these dimensions to push metrics to OpenTelemetry, enabling tracking of performance metrics such as request latency, request charge, and item counts.</remarks>
public void CollectOperationAndNetworkInfo(Func<TelemetryInformation> getTelemetryInformation)
{
TelemetryInformation telemetryInformation = getTelemetryInformation();

KeyValuePair<string, object>[] dimensions = new[]
{
new KeyValuePair<string, object>("Container", $"{this.accountName}/{telemetryInformation.DatabaseId}/{telemetryInformation.ContainerId}"),
new KeyValuePair<string, object>("Operation", telemetryInformation.OperationType),
new KeyValuePair<string, object>("OperationStatusCode", telemetryInformation.StatusCode),
new KeyValuePair<string, object>("ClientCorrelationId", this.clientId),
new KeyValuePair<string, object>("ConsistencyLevel", telemetryInformation.ConsistencyLevel),
new KeyValuePair<string, object>("PartitionKeyRangeId", telemetryInformation.PartitionKeyRangeId),
};

PushOperationLevelMetrics(telemetryInformation, dimensions);
}

/// <summary>
/// Pushes various operation-level metrics to OpenTelemetry.
/// </summary>
/// <param name="telemetryInformation">Contains telemetry data related to the operation, such as item counts, request charge, and latency.</param>
/// <param name="dimensions">Key-value pairs representing various metadata about the operation (e.g., container, operation type, consistency level).</param>
private static void PushOperationLevelMetrics(TelemetryInformation telemetryInformation, KeyValuePair<string, object>[] dimensions)
{
OpenTelemetryMetrics.MaxItemCounter.Add(Convert.ToInt32(telemetryInformation.MaxItemCount), dimensions);
OpenTelemetryMetrics.ActualItemCounter.Add(Convert.ToInt32(telemetryInformation.ActualItemCount), dimensions);
OpenTelemetryMetrics.RegionsContactedCounter.Add(telemetryInformation.RegionsContactedList.Count, dimensions);
OpenTelemetryMetrics.RequestUnitsHistogram.Record(telemetryInformation.RequestCharge, dimensions);
OpenTelemetryMetrics.RequestLatencyHistogram.Record(telemetryInformation.RequestLatency.Value.Milliseconds, dimensions);
OpenTelemetryMetrics.NumberOfOperationCallCounter.Add(1, dimensions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,9 @@ internal class TelemetryInformation
internal double RequestCharge { get; set; } // Required only for operation level telemetry
internal string CollectionLink { get; set; } = null; // Required only for collection cache telemetry
internal ITrace Trace { get; set; } // Required to fetch network level telemetry out of the trace object

internal string MaxItemCount { get; set; }
internal string ActualItemCount { get; set; }
internal string PartitionKeyRangeId { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// ------------------------------------------------------------

namespace Microsoft.Azure.Cosmos
{
using System;
using System.Collections.Generic;
using System.Diagnostics.Metrics;
using System.Text;

/// <summary>
/// The OpenTelemetryMetrics class contains internal static members to create and record Cosmos DB SDK metrics using OpenTelemetry. These metrics allow you to monitor the performance and resource consumption of Cosmos DB operations.
/// </summary>
internal static class OpenTelemetryMetrics
{
private static readonly Meter Meter = new Meter("Azure.Cosmos.SDK.Metrics");

internal static readonly Counter<int> NumberOfOperationCallCounter =
Meter.CreateCounter<int>("cosmos.client.op.calls", "#", "Number of operation calls");

internal static readonly Histogram<double> RequestLatencyHistogram =
Meter.CreateHistogram<double>("cosmos.client.op.latency", "#", "Total end-to-end duration of the operation");

internal static readonly Histogram<double> RequestUnitsHistogram =
Meter.CreateHistogram<double>("cosmos.client.op.RUs", "#", "Total request units per operation (sum of RUs for all requested needed when processing an operation)");

internal static readonly Counter<int> MaxItemCounter =
Meter.CreateCounter<int>("cosmos.client.op.maxItemCount", "#", "For feed operations (query, readAll, readMany, change feed) and batch operations this meter capture the requested maxItemCount per page/request");

internal static readonly Counter<int> ActualItemCounter =
Meter.CreateCounter<int>("cosmos.client.op.actualItemCount", "#", "For feed operations (query, readAll, readMany, change feed) batch operations this meter capture the actual item count in responses from the service");

internal static readonly Counter<int> RegionsContactedCounter =
Meter.CreateCounter<int>("cosmos.client.op.regionsContacted", "#", "Number of regions contacted when executing an operation");
}
}
42 changes: 31 additions & 11 deletions Microsoft.Azure.Cosmos/src/Telemetry/TelemetryToServiceHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Microsoft.Azure.Cosmos.Telemetry
{
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -19,24 +20,25 @@ namespace Microsoft.Azure.Cosmos.Telemetry

internal class TelemetryToServiceHelper : IDisposable
{
private readonly OpenTelemetryMetricsCollector openTelemetryCollector = null;
private ITelemetryCollector collector = new TelemetryCollectorNoOp();

internal static TimeSpan DefaultBackgroundRefreshClientConfigTimeInterval
= TimeSpan.FromMinutes(10);

private readonly AuthorizationTokenProvider cosmosAuthorization;
private readonly CosmosHttpClient httpClient;
private readonly Uri serviceEnpoint;
private readonly Uri serviceEndpoint;
private readonly ConnectionPolicy connectionPolicy;
private readonly string clientId;
private readonly GlobalEndpointManager globalEndpointManager;
private readonly CancellationTokenSource cancellationTokenSource;

private ClientTelemetry clientTelemetry = null;

private TelemetryToServiceHelper()
private TelemetryToServiceHelper(string clientId, string accountName)
{
//NoOpConstructor
this.openTelemetryCollector = new OpenTelemetryMetricsCollector(clientId: clientId, accountName: accountName);
}

private TelemetryToServiceHelper(
Expand All @@ -52,9 +54,11 @@ private TelemetryToServiceHelper(
this.cosmosAuthorization = cosmosAuthorization;
this.httpClient = httpClient;
this.connectionPolicy = connectionPolicy;
this.serviceEnpoint = serviceEndpoint;
this.serviceEndpoint = serviceEndpoint;
this.globalEndpointManager = globalEndpointManager;
this.cancellationTokenSource = cancellationTokenSource;

this.openTelemetryCollector = new OpenTelemetryMetricsCollector(clientId: clientId, accountName: serviceEndpoint.Host);
}

public static TelemetryToServiceHelper CreateAndInitializeClientConfigAndTelemetryJob(string clientId,
Expand All @@ -66,11 +70,11 @@ public static TelemetryToServiceHelper CreateAndInitializeClientConfigAndTelemet
CancellationTokenSource cancellationTokenSource)
{
#if INTERNAL
return new TelemetryToServiceHelper();
return new TelemetryToServiceHelper(clientId: clientId, accountName: serviceEndpoint.Host);
#else
if (connectionPolicy.CosmosClientTelemetryOptions.DisableSendingMetricsToService)
{
return new TelemetryToServiceHelper();
return new TelemetryToServiceHelper(clientId: clientId, accountName: serviceEndpoint.Host);
}

TelemetryToServiceHelper helper = new TelemetryToServiceHelper(
Expand All @@ -82,7 +86,7 @@ public static TelemetryToServiceHelper CreateAndInitializeClientConfigAndTelemet
globalEndpointManager: globalEndpointManager,
cancellationTokenSource: cancellationTokenSource);

_ = helper.RetrieveConfigAndInitiateTelemetryAsync(); // Let it run in backgroud
_ = helper.RetrieveConfigAndInitiateTelemetryAsync(); // Let it run in background

return helper;
#endif
Expand All @@ -92,7 +96,7 @@ private async Task RetrieveConfigAndInitiateTelemetryAsync()
{
try
{
Uri serviceEndpointWithPath = new Uri(this.serviceEnpoint + Paths.ClientConfigPathSegment);
Uri serviceEndpointWithPath = new Uri(this.serviceEndpoint + Paths.ClientConfigPathSegment);
while (!this.cancellationTokenSource.IsCancellationRequested)
{
TryCatch<AccountClientConfiguration> databaseAccountClientConfigs = await this.GetDatabaseAccountClientConfigAsync(
Expand Down Expand Up @@ -172,9 +176,25 @@ await cosmosAuthorization.AddAuthorizationHeaderAsync(
}
}

public ITelemetryCollector GetCollector()
public List<ITelemetryCollector> GetCollectors(RequestMessage request = null)
{
List<ITelemetryCollector> collectors = new List<ITelemetryCollector>(2);
if (request is null || IsAllowed(request))
{
collectors.Add(this.collector);
}

if (this.openTelemetryCollector != null)
{
collectors.Add(this.openTelemetryCollector);
}

return collectors;
}

private static bool IsAllowed(RequestMessage request)
{
return this.collector;
return ClientTelemetryOptions.AllowedResourceTypes.Equals(request.ResourceType);
}

public bool IsClientTelemetryJobRunning()
Expand Down
Loading

0 comments on commit c47f59b

Please sign in to comment.