From c47f59b17c0feec23fe710fa4a15fe9eaba60878 Mon Sep 17 00:00:00 2001 From: Sourabh Jain Date: Fri, 13 Sep 2024 08:19:52 +0530 Subject: [PATCH] Added request level metrics --- Microsoft.Azure.Cosmos/src/DocumentClient.cs | 1 + .../src/Handler/DiagnosticsHandlerHelper.cs | 10 +-- .../src/Handler/TelemetryHandler.cs | 57 +++++++-------- .../src/Routing/ClientCollectionCache.cs | 4 +- .../OpenTelemetryMetricsCollector.cs | 73 +++++++++++++++++++ .../Collector/TelemetryInformation.cs | 4 + .../OpenTelemetry/OpenTelemetryMetrics.cs | 37 ++++++++++ .../src/Telemetry/TelemetryToServiceHelper.cs | 42 ++++++++--- .../ClientCreateAndInitializeTest.cs | 41 +++++++++-- ...icrosoft.Azure.Cosmos.EmulatorTests.csproj | 5 +- 10 files changed, 220 insertions(+), 54 deletions(-) create mode 100644 Microsoft.Azure.Cosmos/src/Telemetry/Collector/OpenTelemetryMetricsCollector.cs create mode 100644 Microsoft.Azure.Cosmos/src/Telemetry/OpenTelemetry/OpenTelemetryMetrics.cs diff --git a/Microsoft.Azure.Cosmos/src/DocumentClient.cs b/Microsoft.Azure.Cosmos/src/DocumentClient.cs index 946fa4cb08..47cee3162c 100644 --- a/Microsoft.Azure.Cosmos/src/DocumentClient.cs +++ b/Microsoft.Azure.Cosmos/src/DocumentClient.cs @@ -20,6 +20,7 @@ namespace Microsoft.Azure.Cosmos using global::Azure.Core; using Microsoft.Azure.Cosmos.Common; using Microsoft.Azure.Cosmos.Core.Trace; + using Microsoft.Azure.Cosmos.Handler; using Microsoft.Azure.Cosmos.Query; using Microsoft.Azure.Cosmos.Query.Core.QueryPlan; using Microsoft.Azure.Cosmos.Routing; diff --git a/Microsoft.Azure.Cosmos/src/Handler/DiagnosticsHandlerHelper.cs b/Microsoft.Azure.Cosmos/src/Handler/DiagnosticsHandlerHelper.cs index 266524e38b..513b7e5f23 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/DiagnosticsHandlerHelper.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/DiagnosticsHandlerHelper.cs @@ -18,15 +18,15 @@ namespace Microsoft.Azure.Cosmos.Handler /// internal class DiagnosticsHandlerHelper { - private const string Diagnostickey = "diagnostic"; - private const string Telemetrykey = "telemetry"; + private const string DiagnosticKey = "diagnostic"; + private const string TelemetryKey = "telemetry"; public static readonly TimeSpan DiagnosticsRefreshInterval = TimeSpan.FromSeconds(10); private static readonly TimeSpan ClientTelemetryRefreshInterval = TimeSpan.FromSeconds(5); // Need to reset it in Tests hence kept it non-readonly. - private static SystemUsageRecorder DiagnosticSystemUsageRecorder = new SystemUsageRecorder( - identifier: Diagnostickey, + private static readonly SystemUsageRecorder DiagnosticSystemUsageRecorder = new SystemUsageRecorder( + identifier: DiagnosticKey, historyLength: 6, refreshInterval: DiagnosticsHandlerHelper.DiagnosticsRefreshInterval); private static SystemUsageRecorder TelemetrySystemUsageRecorder = null; @@ -103,7 +103,7 @@ private DiagnosticsHandlerHelper() { // re-initialize a fresh telemetry recorder when feature is switched on DiagnosticsHandlerHelper.TelemetrySystemUsageRecorder = new SystemUsageRecorder( - identifier: Telemetrykey, + identifier: TelemetryKey, historyLength: 120, refreshInterval: DiagnosticsHandlerHelper.ClientTelemetryRefreshInterval); diff --git a/Microsoft.Azure.Cosmos/src/Handler/TelemetryHandler.cs b/Microsoft.Azure.Cosmos/src/Handler/TelemetryHandler.cs index 039373718f..7f07ccae59 100644 --- a/Microsoft.Azure.Cosmos/src/Handler/TelemetryHandler.cs +++ b/Microsoft.Azure.Cosmos/src/Handler/TelemetryHandler.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos.Handlers { using System; + using System.Collections.Generic; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -26,38 +27,36 @@ public override async Task SendAsync( CancellationToken cancellationToken) { ResponseMessage response = await base.SendAsync(request, cancellationToken); - if (this.IsAllowed(request)) + try { - try - { - this.telemetryToServiceHelper.GetCollector().CollectOperationAndNetworkInfo( - () => new TelemetryInformation - { - RegionsContactedList = response.Diagnostics.GetContactedRegions(), - RequestLatency = response.Diagnostics.GetClientElapsedTime(), - StatusCode = response.StatusCode, - ResponseSizeInBytes = TelemetryHandler.GetPayloadSize(response), - ContainerId = request.ContainerId, - DatabaseId = request.DatabaseId, - OperationType = request.OperationType, - ResourceType = request.ResourceType, - ConsistencyLevel = request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel], - RequestCharge = response.Headers.RequestCharge, - SubStatusCode = response.Headers.SubStatusCode, - Trace = response.Trace - }); - } - catch (Exception ex) - { - DefaultTrace.TraceError("Error while collecting telemetry information : {0}", ex); - } + this.telemetryToServiceHelper + .GetCollectors(request) + .ForEach((collector) => collector.CollectOperationAndNetworkInfo( + () => new TelemetryInformation + { + RegionsContactedList = response.Diagnostics.GetContactedRegions(), + RequestLatency = response.Diagnostics.GetClientElapsedTime(), + StatusCode = response.StatusCode, + ResponseSizeInBytes = TelemetryHandler.GetPayloadSize(response), + ContainerId = request.ContainerId, + DatabaseId = request.DatabaseId, + OperationType = request.OperationType, + ResourceType = request.ResourceType, + ConsistencyLevel = request.Headers?[Documents.HttpConstants.HttpHeaders.ConsistencyLevel], + RequestCharge = response.Headers.RequestCharge, + SubStatusCode = response.Headers.SubStatusCode, + Trace = response.Trace, + MaxItemCount = request.Headers.PageSize, + ActualItemCount = response.Headers.ItemCount, + PartitionKeyRangeId = request.Headers.PartitionKeyRangeId + })); + } + catch (Exception ex) + { + DefaultTrace.TraceError("Error while collecting telemetry information : {0}", ex); } - return response; - } - private bool IsAllowed(RequestMessage request) - { - return ClientTelemetryOptions.AllowedResourceTypes.Equals(request.ResourceType); + return response; } /// diff --git a/Microsoft.Azure.Cosmos/src/Routing/ClientCollectionCache.cs b/Microsoft.Azure.Cosmos/src/Routing/ClientCollectionCache.cs index 534a0b8a22..d643eeba7e 100644 --- a/Microsoft.Azure.Cosmos/src/Routing/ClientCollectionCache.cs +++ b/Microsoft.Azure.Cosmos/src/Routing/ClientCollectionCache.cs @@ -216,7 +216,7 @@ await this.storeModel.ProcessMessageAsync(request)) { ContainerProperties containerProperties = CosmosResource.FromStream(response); - this.telemetryToServiceHelper.GetCollector().CollectCacheInfo( + this.telemetryToServiceHelper.GetCollectors().ForEach((collector) => collector.CollectCacheInfo( ClientCollectionCache.TelemetrySourceName, () => new TelemetryInformation { @@ -227,7 +227,7 @@ await this.storeModel.ProcessMessageAsync(request)) ResourceType = request.ResourceType, SubStatusCode = response.SubStatusCode, CollectionLink = collectionLink - }); + })); return containerProperties; } diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/Collector/OpenTelemetryMetricsCollector.cs b/Microsoft.Azure.Cosmos/src/Telemetry/Collector/OpenTelemetryMetricsCollector.cs new file mode 100644 index 0000000000..6542ae94e7 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Telemetry/Collector/OpenTelemetryMetricsCollector.cs @@ -0,0 +1,73 @@ +//------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +//------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos.Telemetry +{ + using System; + using System.Collections.Generic; + using Microsoft.Azure.Cosmos.Telemetry.Collector; + + /// + /// The OpenTelemetryMetricsCollector class is responsible for collecting and recording Cosmos DB operational metrics, such as item counts, request latency, request units, and regions contacted. + /// This data is captured using the OpenTelemetry metrics API, which allows tracking and analysis of Cosmos DB operations at a granular level. + /// + internal class OpenTelemetryMetricsCollector : ITelemetryCollector + { + private readonly string clientId; + private readonly string accountName; + + /// + /// Initializes a new instance of the OpenTelemetryMetricsCollector class. + /// + /// A unique identifier for the Cosmos DB client instance + /// The Cosmos DB account name. + public OpenTelemetryMetricsCollector(string clientId, string accountName) + { + this.clientId = clientId; + this.accountName = accountName; + } + + public void CollectCacheInfo(string cacheName, Func getTelemetryInformation) + { + // No OP + } + + /// + /// Collects telemetry data related to operations and network information, including request performance, item counts, and regions contacted. + /// + /// A function that provides telemetry details such as operation type, status code, consistency level, and request charge. + /// This method gathers telemetry information, including details such as the database, container, operation type, status code, consistency level, and partition key range ID. It uses these dimensions to push metrics to OpenTelemetry, enabling tracking of performance metrics such as request latency, request charge, and item counts. + public void CollectOperationAndNetworkInfo(Func getTelemetryInformation) + { + TelemetryInformation telemetryInformation = getTelemetryInformation(); + + KeyValuePair[] dimensions = new[] + { + new KeyValuePair("Container", $"{this.accountName}/{telemetryInformation.DatabaseId}/{telemetryInformation.ContainerId}"), + new KeyValuePair("Operation", telemetryInformation.OperationType), + new KeyValuePair("OperationStatusCode", telemetryInformation.StatusCode), + new KeyValuePair("ClientCorrelationId", this.clientId), + new KeyValuePair("ConsistencyLevel", telemetryInformation.ConsistencyLevel), + new KeyValuePair("PartitionKeyRangeId", telemetryInformation.PartitionKeyRangeId), + }; + + PushOperationLevelMetrics(telemetryInformation, dimensions); + } + + /// + /// Pushes various operation-level metrics to OpenTelemetry. + /// + /// Contains telemetry data related to the operation, such as item counts, request charge, and latency. + /// Key-value pairs representing various metadata about the operation (e.g., container, operation type, consistency level). + private static void PushOperationLevelMetrics(TelemetryInformation telemetryInformation, KeyValuePair[] dimensions) + { + OpenTelemetryMetrics.MaxItemCounter.Add(Convert.ToInt32(telemetryInformation.MaxItemCount), dimensions); + OpenTelemetryMetrics.ActualItemCounter.Add(Convert.ToInt32(telemetryInformation.ActualItemCount), dimensions); + OpenTelemetryMetrics.RegionsContactedCounter.Add(telemetryInformation.RegionsContactedList.Count, dimensions); + OpenTelemetryMetrics.RequestUnitsHistogram.Record(telemetryInformation.RequestCharge, dimensions); + OpenTelemetryMetrics.RequestLatencyHistogram.Record(telemetryInformation.RequestLatency.Value.Milliseconds, dimensions); + OpenTelemetryMetrics.NumberOfOperationCallCounter.Add(1, dimensions); + } + } +} diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/Collector/TelemetryInformation.cs b/Microsoft.Azure.Cosmos/src/Telemetry/Collector/TelemetryInformation.cs index a8210fb0a4..580561d6b4 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/Collector/TelemetryInformation.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/Collector/TelemetryInformation.cs @@ -26,5 +26,9 @@ internal class TelemetryInformation internal double RequestCharge { get; set; } // Required only for operation level telemetry internal string CollectionLink { get; set; } = null; // Required only for collection cache telemetry internal ITrace Trace { get; set; } // Required to fetch network level telemetry out of the trace object + + internal string MaxItemCount { get; set; } + internal string ActualItemCount { get; set; } + internal string PartitionKeyRangeId { get; set; } } } \ No newline at end of file diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/OpenTelemetry/OpenTelemetryMetrics.cs b/Microsoft.Azure.Cosmos/src/Telemetry/OpenTelemetry/OpenTelemetryMetrics.cs new file mode 100644 index 0000000000..ed0eebbc95 --- /dev/null +++ b/Microsoft.Azure.Cosmos/src/Telemetry/OpenTelemetry/OpenTelemetryMetrics.cs @@ -0,0 +1,37 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. All rights reserved. +// ------------------------------------------------------------ + +namespace Microsoft.Azure.Cosmos +{ + using System; + using System.Collections.Generic; + using System.Diagnostics.Metrics; + using System.Text; + + /// + /// The OpenTelemetryMetrics class contains internal static members to create and record Cosmos DB SDK metrics using OpenTelemetry. These metrics allow you to monitor the performance and resource consumption of Cosmos DB operations. + /// + internal static class OpenTelemetryMetrics + { + private static readonly Meter Meter = new Meter("Azure.Cosmos.SDK.Metrics"); + + internal static readonly Counter NumberOfOperationCallCounter = + Meter.CreateCounter("cosmos.client.op.calls", "#", "Number of operation calls"); + + internal static readonly Histogram RequestLatencyHistogram = + Meter.CreateHistogram("cosmos.client.op.latency", "#", "Total end-to-end duration of the operation"); + + internal static readonly Histogram RequestUnitsHistogram = + Meter.CreateHistogram("cosmos.client.op.RUs", "#", "Total request units per operation (sum of RUs for all requested needed when processing an operation)"); + + internal static readonly Counter MaxItemCounter = + Meter.CreateCounter("cosmos.client.op.maxItemCount", "#", "For feed operations (query, readAll, readMany, change feed) and batch operations this meter capture the requested maxItemCount per page/request"); + + internal static readonly Counter ActualItemCounter = + Meter.CreateCounter("cosmos.client.op.actualItemCount", "#", "For feed operations (query, readAll, readMany, change feed) batch operations this meter capture the actual item count in responses from the service"); + + internal static readonly Counter RegionsContactedCounter = + Meter.CreateCounter("cosmos.client.op.regionsContacted", "#", "Number of regions contacted when executing an operation"); + } +} diff --git a/Microsoft.Azure.Cosmos/src/Telemetry/TelemetryToServiceHelper.cs b/Microsoft.Azure.Cosmos/src/Telemetry/TelemetryToServiceHelper.cs index 4b94811b67..5ee15cd1f8 100644 --- a/Microsoft.Azure.Cosmos/src/Telemetry/TelemetryToServiceHelper.cs +++ b/Microsoft.Azure.Cosmos/src/Telemetry/TelemetryToServiceHelper.cs @@ -5,6 +5,7 @@ namespace Microsoft.Azure.Cosmos.Telemetry { using System; + using System.Collections.Generic; using System.Net.Http; using System.Threading; using System.Threading.Tasks; @@ -19,14 +20,15 @@ namespace Microsoft.Azure.Cosmos.Telemetry internal class TelemetryToServiceHelper : IDisposable { + private readonly OpenTelemetryMetricsCollector openTelemetryCollector = null; private ITelemetryCollector collector = new TelemetryCollectorNoOp(); - + internal static TimeSpan DefaultBackgroundRefreshClientConfigTimeInterval = TimeSpan.FromMinutes(10); private readonly AuthorizationTokenProvider cosmosAuthorization; private readonly CosmosHttpClient httpClient; - private readonly Uri serviceEnpoint; + private readonly Uri serviceEndpoint; private readonly ConnectionPolicy connectionPolicy; private readonly string clientId; private readonly GlobalEndpointManager globalEndpointManager; @@ -34,9 +36,9 @@ internal static TimeSpan DefaultBackgroundRefreshClientConfigTimeInterval private ClientTelemetry clientTelemetry = null; - private TelemetryToServiceHelper() + private TelemetryToServiceHelper(string clientId, string accountName) { - //NoOpConstructor + this.openTelemetryCollector = new OpenTelemetryMetricsCollector(clientId: clientId, accountName: accountName); } private TelemetryToServiceHelper( @@ -52,9 +54,11 @@ private TelemetryToServiceHelper( this.cosmosAuthorization = cosmosAuthorization; this.httpClient = httpClient; this.connectionPolicy = connectionPolicy; - this.serviceEnpoint = serviceEndpoint; + this.serviceEndpoint = serviceEndpoint; this.globalEndpointManager = globalEndpointManager; this.cancellationTokenSource = cancellationTokenSource; + + this.openTelemetryCollector = new OpenTelemetryMetricsCollector(clientId: clientId, accountName: serviceEndpoint.Host); } public static TelemetryToServiceHelper CreateAndInitializeClientConfigAndTelemetryJob(string clientId, @@ -66,11 +70,11 @@ public static TelemetryToServiceHelper CreateAndInitializeClientConfigAndTelemet CancellationTokenSource cancellationTokenSource) { #if INTERNAL - return new TelemetryToServiceHelper(); + return new TelemetryToServiceHelper(clientId: clientId, accountName: serviceEndpoint.Host); #else if (connectionPolicy.CosmosClientTelemetryOptions.DisableSendingMetricsToService) { - return new TelemetryToServiceHelper(); + return new TelemetryToServiceHelper(clientId: clientId, accountName: serviceEndpoint.Host); } TelemetryToServiceHelper helper = new TelemetryToServiceHelper( @@ -82,7 +86,7 @@ public static TelemetryToServiceHelper CreateAndInitializeClientConfigAndTelemet globalEndpointManager: globalEndpointManager, cancellationTokenSource: cancellationTokenSource); - _ = helper.RetrieveConfigAndInitiateTelemetryAsync(); // Let it run in backgroud + _ = helper.RetrieveConfigAndInitiateTelemetryAsync(); // Let it run in background return helper; #endif @@ -92,7 +96,7 @@ private async Task RetrieveConfigAndInitiateTelemetryAsync() { try { - Uri serviceEndpointWithPath = new Uri(this.serviceEnpoint + Paths.ClientConfigPathSegment); + Uri serviceEndpointWithPath = new Uri(this.serviceEndpoint + Paths.ClientConfigPathSegment); while (!this.cancellationTokenSource.IsCancellationRequested) { TryCatch databaseAccountClientConfigs = await this.GetDatabaseAccountClientConfigAsync( @@ -172,9 +176,25 @@ await cosmosAuthorization.AddAuthorizationHeaderAsync( } } - public ITelemetryCollector GetCollector() + public List GetCollectors(RequestMessage request = null) + { + List collectors = new List(2); + if (request is null || IsAllowed(request)) + { + collectors.Add(this.collector); + } + + if (this.openTelemetryCollector != null) + { + collectors.Add(this.openTelemetryCollector); + } + + return collectors; + } + + private static bool IsAllowed(RequestMessage request) { - return this.collector; + return ClientTelemetryOptions.AllowedResourceTypes.Equals(request.ResourceType); } public bool IsClientTelemetryJobRunning() diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientCreateAndInitializeTest.cs b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientCreateAndInitializeTest.cs index 1813700e05..922ae2ef7a 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientCreateAndInitializeTest.cs +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/ClientCreateAndInitializeTest.cs @@ -13,6 +13,9 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; using Moq; using Moq.Protected; + using OpenTelemetry.Metrics; + using OpenTelemetry; + using System.Diagnostics; [TestClass] public class ClientCreateAndInitializeTest : BaseCosmosClientHelper @@ -54,6 +57,14 @@ public async Task Cleanup() [TestMethod] public async Task CreateAndInitializeTest() { + var histogramBuckets = new double[] { 0, 5, 10, 25, 50, 75, 100, 250, 500 }; + MeterProvider meterProvider = Sdk + .CreateMeterProviderBuilder() + .AddMeter("*") + .AddView("cosmos.client.op.RUs", new ExplicitBucketHistogramConfiguration { Boundaries = histogramBuckets }) + .AddConsoleExporter() + .Build(); + int httpCallsMade = 0; HttpClientHandlerHelper httpClientHandlerHelper = new HttpClientHandlerHelper { @@ -74,15 +85,35 @@ public async Task CreateAndInitializeTest() }; CosmosClient cosmosClient = await CosmosClient.CreateAndInitializeAsync(endpoint, authKey, containers, cosmosClientOptions); - Assert.IsNotNull(cosmosClient); + // Assert.IsNotNull(cosmosClient); int httpCallsMadeAfterCreation = httpCallsMade; ContainerInternal container = (ContainerInternal)cosmosClient.GetContainer(this.database.Id, "ClientCreateAndInitializeContainer"); - ItemResponse readResponse = await container.ReadItemAsync("1", new Cosmos.PartitionKey("Status1")); - string diagnostics = readResponse.Diagnostics.ToString(); - Assert.IsTrue(diagnostics.Contains("\"ConnectionMode\":\"Direct\"")); - Assert.AreEqual(httpCallsMade, httpCallsMadeAfterCreation); + + await Task.Delay(1000); + + Stopwatch sw = Stopwatch.StartNew(); + sw.Start(); + while(true) + { + ItemResponse readResponse = await container.ReadItemAsync("1", new Cosmos.PartitionKey("Status1")); + string diagnostics = readResponse.Diagnostics.ToString(); + if(sw.ElapsedMilliseconds > 2000) + { + break; + } + } + sw.Stop(); + + await Task.Delay(1000); + + /* Assert.IsTrue(diagnostics.Contains("\"ConnectionMode\":\"Direct\"")); + Assert.AreEqual(httpCallsMade, httpCallsMadeAfterCreation);*/ cosmosClient.Dispose(); + + meterProvider.Dispose(); + + await Task.Delay(1000); } [TestMethod] diff --git a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj index 9978bcce5c..9744fca3e9 100644 --- a/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj +++ b/Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.EmulatorTests/Microsoft.Azure.Cosmos.EmulatorTests.csproj @@ -52,9 +52,10 @@ - + - + +