From a962eca455138a7b49efe43922cb0b3d4d2aa8a6 Mon Sep 17 00:00:00 2001 From: Piotr Karpala Date: Mon, 10 Jun 2024 23:09:19 -0400 Subject: [PATCH 1/5] Support for Default Azure Identity Signed-off-by: Piotr Karpala --- .../Keda.CosmosDb.Scaler.Demo.OrderProcessor.csproj | 1 + src/Scaler.Demo/OrderProcessor/Worker.cs | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/src/Scaler.Demo/OrderProcessor/Keda.CosmosDb.Scaler.Demo.OrderProcessor.csproj b/src/Scaler.Demo/OrderProcessor/Keda.CosmosDb.Scaler.Demo.OrderProcessor.csproj index 2d07aa2..f696b13 100644 --- a/src/Scaler.Demo/OrderProcessor/Keda.CosmosDb.Scaler.Demo.OrderProcessor.csproj +++ b/src/Scaler.Demo/OrderProcessor/Keda.CosmosDb.Scaler.Demo.OrderProcessor.csproj @@ -5,6 +5,7 @@ + diff --git a/src/Scaler.Demo/OrderProcessor/Worker.cs b/src/Scaler.Demo/OrderProcessor/Worker.cs index 949a69d..eb2844a 100644 --- a/src/Scaler.Demo/OrderProcessor/Worker.cs +++ b/src/Scaler.Demo/OrderProcessor/Worker.cs @@ -3,6 +3,7 @@ using System.Net; using System.Threading; using System.Threading.Tasks; +using Azure.Identity; using Keda.CosmosDb.Scaler.Demo.Shared; using Microsoft.Azure.Cosmos; using Microsoft.Extensions.Hosting; @@ -25,6 +26,10 @@ public Worker(CosmosDbConfig cosmosDbConfig, ILogger logger) public override async Task StartAsync(CancellationToken cancellationToken) { + var cosmosClient = _cosmosDbConfig.Connection.Contains("AccountKey") + ? new CosmosClient(_cosmosDbConfig.Connection) + : new CosmosClient(_cosmosDbConfig.Connection, new DefaultAzureCredential()); + Database leaseDatabase = await new CosmosClient(_cosmosDbConfig.LeaseConnection) .CreateDatabaseIfNotExistsAsync(_cosmosDbConfig.LeaseDatabaseId, cancellationToken: cancellationToken); From 5b13abda69ac52029ec1e3f673ba106e77b5d0a1 Mon Sep 17 00:00:00 2001 From: Piotr Karpala Date: Mon, 10 Jun 2024 23:09:35 -0400 Subject: [PATCH 2/5] Support for Default Azure Identity Signed-off-by: Piotr Karpala --- ...da.CosmosDb.Scaler.Demo.OrderGenerator.csproj | 1 + src/Scaler.Demo/OrderGenerator/Program.cs | 7 ++++++- src/Scaler/Keda.CosmosDb.Scaler.csproj | 1 + src/Scaler/Services/CosmosDbFactory.cs | 16 ++++++++-------- src/Scaler/Services/CosmosDbMetricProvider.cs | 1 + 5 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/Scaler.Demo/OrderGenerator/Keda.CosmosDb.Scaler.Demo.OrderGenerator.csproj b/src/Scaler.Demo/OrderGenerator/Keda.CosmosDb.Scaler.Demo.OrderGenerator.csproj index 0cbfce7..4bb7efb 100644 --- a/src/Scaler.Demo/OrderGenerator/Keda.CosmosDb.Scaler.Demo.OrderGenerator.csproj +++ b/src/Scaler.Demo/OrderGenerator/Keda.CosmosDb.Scaler.Demo.OrderGenerator.csproj @@ -6,6 +6,7 @@ + diff --git a/src/Scaler.Demo/OrderGenerator/Program.cs b/src/Scaler.Demo/OrderGenerator/Program.cs index 875094b..77bfac5 100644 --- a/src/Scaler.Demo/OrderGenerator/Program.cs +++ b/src/Scaler.Demo/OrderGenerator/Program.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using System.Threading.Tasks; +using Azure.Identity; using Bogus; using Bogus.DataSets; using Keda.CosmosDb.Scaler.Demo.Shared; @@ -85,7 +86,11 @@ private static bool ReadIsSingleArticle() private static async Task CreateOrdersAsync(int count, bool isSingleArticle) { - Container container = new CosmosClient(_cosmosDbConfig.Connection) + using var cosmosClient = _cosmosDbConfig.Connection.Contains("AccountKey") + ? new CosmosClient(_cosmosDbConfig.Connection, new CosmosClientOptions { ConnectionMode = ConnectionMode.Gateway }) + : new CosmosClient(_cosmosDbConfig.Connection, new DefaultAzureCredential(), new CosmosClientOptions { ConnectionMode = ConnectionMode.Direct }); + + Container container = cosmosClient .GetContainer(_cosmosDbConfig.DatabaseId, _cosmosDbConfig.ContainerId); int remainingCount = count; diff --git a/src/Scaler/Keda.CosmosDb.Scaler.csproj b/src/Scaler/Keda.CosmosDb.Scaler.csproj index 9c9c6e8..5ce777f 100644 --- a/src/Scaler/Keda.CosmosDb.Scaler.csproj +++ b/src/Scaler/Keda.CosmosDb.Scaler.csproj @@ -9,6 +9,7 @@ + diff --git a/src/Scaler/Services/CosmosDbFactory.cs b/src/Scaler/Services/CosmosDbFactory.cs index 25cf364..249c361 100644 --- a/src/Scaler/Services/CosmosDbFactory.cs +++ b/src/Scaler/Services/CosmosDbFactory.cs @@ -1,4 +1,5 @@ using System.Collections.Concurrent; +using Azure.Identity; using Microsoft.Azure.Cosmos; namespace Keda.CosmosDb.Scaler @@ -9,14 +10,13 @@ internal sealed class CosmosDbFactory // maintain a single instance of CosmosClient per lifetime of the application. private readonly ConcurrentDictionary _cosmosClientCache = new(); - public CosmosClient GetCosmosClient(string connection) - { - return _cosmosClientCache.GetOrAdd(connection, CreateCosmosClient); - } + public CosmosClient GetCosmosClient(string connection) => + _cosmosClientCache.GetOrAdd(connection, CreateCosmosClient); + + private CosmosClient CreateCosmosClient(string connection) => + connection.Contains("Accountkey") ? + new CosmosClient(connection, new CosmosClientOptions { ConnectionMode = ConnectionMode.Gateway }) : + new CosmosClient(connection, new DefaultAzureCredential(), new CosmosClientOptions { ConnectionMode = ConnectionMode.Direct }); - private CosmosClient CreateCosmosClient(string connection) - { - return new CosmosClient(connection, new CosmosClientOptions { ConnectionMode = ConnectionMode.Gateway }); - } } } diff --git a/src/Scaler/Services/CosmosDbMetricProvider.cs b/src/Scaler/Services/CosmosDbMetricProvider.cs index 61242dc..db9cd85 100644 --- a/src/Scaler/Services/CosmosDbMetricProvider.cs +++ b/src/Scaler/Services/CosmosDbMetricProvider.cs @@ -3,6 +3,7 @@ using System.Net; using System.Net.Http; using System.Threading.Tasks; +using Azure.Identity; using Microsoft.Azure.Cosmos; using Microsoft.Extensions.Logging; From d43563985926323b8f8c640406eb30c05d36b1d6 Mon Sep 17 00:00:00 2001 From: Piotr Karpala Date: Tue, 11 Jun 2024 13:39:32 -0400 Subject: [PATCH 3/5] typo Signed-off-by: Piotr Karpala --- src/Scaler/Services/CosmosDbFactory.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Scaler/Services/CosmosDbFactory.cs b/src/Scaler/Services/CosmosDbFactory.cs index 249c361..050ad98 100644 --- a/src/Scaler/Services/CosmosDbFactory.cs +++ b/src/Scaler/Services/CosmosDbFactory.cs @@ -14,7 +14,7 @@ public CosmosClient GetCosmosClient(string connection) => _cosmosClientCache.GetOrAdd(connection, CreateCosmosClient); private CosmosClient CreateCosmosClient(string connection) => - connection.Contains("Accountkey") ? + connection.Contains("AccountKey") ? new CosmosClient(connection, new CosmosClientOptions { ConnectionMode = ConnectionMode.Gateway }) : new CosmosClient(connection, new DefaultAzureCredential(), new CosmosClientOptions { ConnectionMode = ConnectionMode.Direct }); From 61df8d9bf1f969b90fafab38b5f1a5cbed85b541 Mon Sep 17 00:00:00 2001 From: Piotr Karpala Date: Tue, 11 Jun 2024 15:48:02 -0400 Subject: [PATCH 4/5] Fixing tests Signed-off-by: Piotr Karpala --- README.md | 14 +- .../CosmosDbScalerServiceTests.cs | 130 ++++++++++++------ src/Scaler/Services/CosmosDbMetricProvider.cs | 4 +- src/Scaler/Services/ScalerMetadata.cs | 24 ++++ 4 files changed, 124 insertions(+), 48 deletions(-) diff --git a/README.md b/README.md index 25876d3..e957432 100644 --- a/README.md +++ b/README.md @@ -68,10 +68,12 @@ The specification below describes the `trigger` metadata in `ScaledObject` resou - type: external metadata: scalerAddress: external-scaler-azure-cosmos-db.keda:4050 # Mandatory. Address of the external scaler service. - connection: # Mandatory. Connection string of Cosmos DB account with monitored container. + connection: # Optional. Connection string of Cosmos DB account with monitored container. Either `connection` or `endpoint` has to be provided. + endpoint: # Optional. Cosmos DB endpoint with monitored container. Either `connection` or `endpoint` has to be provided. databaseId: # Mandatory. ID of Cosmos DB database containing monitored container. containerId: # Mandatory. ID of monitored container. - leaseConnection: # Mandatory. Connection string of Cosmos DB account with lease container. + leaseConnection: # Optional. Connection string of Cosmos DB account with lease container. Either `leaseConnection` or `leaseEndpoint` has to be provided. + leaseEndpoint: # Optional. Cosmos DB endpoint with lease container. Either `leaseConnection` or `leaseEndpoint` has to be provided. leaseDatabaseId: # Mandatory. ID of Cosmos DB database containing lease container. leaseContainerId: # Mandatory. ID of lease container. processorName: # Mandatory. Name of change-feed processor used by listener application. @@ -81,13 +83,13 @@ The specification below describes the `trigger` metadata in `ScaledObject` resou - **`scalerAddress`** - Address of the external scaler service. This would be in format `.:`. If you installed Azure Cosmos DB external scaler Helm chart in `keda` namespace and did not specify custom values, the metadata value would be `external-scaler-azure-cosmos-db.keda:4050`. -- **`connection`** - Connection string of the Cosmos DB account that contains the monitored container. +- **`connection`** or **`endpoint`** - Connection string of the Cosmos DB account or Cosmos DB endpoint that contains the monitored container. - **`databaseId`** - ID of Cosmos DB database that contains the monitored container. - **`containerId`** - ID of the monitored container. -- **`leaseConnection`** - Connection string of the Cosmos DB account that contains the lease container. This can be same or different from the value of `connection` metadata. +- **`leaseConnection`** or **`leaseEndpoint`**- Connection string of the Cosmos DB account or Cosmos DB endpoint that contains the lease container. This can be same or different from the value of `connection` metadata. - **`leaseDatabaseId`** - ID of Cosmos DB database that contains the lease container. This can be same or different from the value of `databaseId` metadata. @@ -95,4 +97,6 @@ The specification below describes the `trigger` metadata in `ScaledObject` resou - **`processorName`** - Name of change-feed processor used by listener application. For more information on this, you can refer to [Implementing the change feed processor](https://docs.microsoft.com/azure/cosmos-db/sql/change-feed-processor#implementing-the-change-feed-processor) section. -> **Note** Ideally, we would have created `TriggerAuthentication` resource that would have prevented us from adding the connection strings in plain text in the `ScaledObject` trigger metadata. However, this is not possible since at the moment, the triggers of `external` type do not support referencing a `TriggerAuthentication` resource ([link](https://keda.sh/docs/scalers/external/#authentication-parameters)). +### Workload Identity support + +To utilize Azure Workload Identity via Default Azure Credential use **`endpoint`** and **`leaseEndpoint`** parameters. \ No newline at end of file diff --git a/src/Scaler.Tests/CosmosDbScalerServiceTests.cs b/src/Scaler.Tests/CosmosDbScalerServiceTests.cs index 4e2dc47..2156b0e 100644 --- a/src/Scaler.Tests/CosmosDbScalerServiceTests.cs +++ b/src/Scaler.Tests/CosmosDbScalerServiceTests.cs @@ -1,3 +1,4 @@ +using System; using System.Threading.Tasks; using Google.Protobuf.Collections; using Moq; @@ -18,10 +19,8 @@ public CosmosDbScalerServiceTests() } [Theory] - [InlineData("connection")] [InlineData("databaseId")] [InlineData("containerId")] - [InlineData("leaseConnection")] [InlineData("leaseDatabaseId")] [InlineData("leaseContainerId")] [InlineData("processorName")] @@ -31,29 +30,31 @@ await Assert.ThrowsAsync( () => _cosmosDbScalerService.IsActive(GetScaledObjectRefWithoutMetadata(metadataKey), null)); } - [Fact] - public async Task IsActive_ReturnsFalseOnZeroPartitions() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task IsActive_ReturnsFalseOnZeroPartitions(bool workloadIdentity) { _metricProviderMock.Setup(provider => provider.GetPartitionCountAsync(It.IsAny())).ReturnsAsync(0L); - IsActiveResponse response = await _cosmosDbScalerService.IsActive(GetScaledObjectRef(), null); + IsActiveResponse response = await _cosmosDbScalerService.IsActive(GetScaledObjectRef(workloadIdentity), null); Assert.False(response.Result); } [Theory] - [InlineData(1L)] - [InlineData(100L)] - public async Task IsActive_ReturnsFalseOnNonZeroPartitions(long partitionCount) + [InlineData(1L, true)] + [InlineData(1L, false)] + [InlineData(100L, true)] + [InlineData(100L, false)] + public async Task IsActive_ReturnsFalseOnNonZeroPartitions(long partitionCount, bool workloadIdentity) { _metricProviderMock.Setup(provider => provider.GetPartitionCountAsync(It.IsAny())).ReturnsAsync(partitionCount); - IsActiveResponse response = await _cosmosDbScalerService.IsActive(GetScaledObjectRef(), null); + IsActiveResponse response = await _cosmosDbScalerService.IsActive(GetScaledObjectRef(workloadIdentity), null); Assert.True(response.Result); } [Theory] - [InlineData("connection")] [InlineData("databaseId")] [InlineData("containerId")] - [InlineData("leaseConnection")] [InlineData("leaseDatabaseId")] [InlineData("leaseContainerId")] [InlineData("processorName")] @@ -64,13 +65,16 @@ await Assert.ThrowsAsync( } [Theory] - [InlineData(0L)] - [InlineData(1L)] - [InlineData(100L)] - public async Task GetMetrics_ReturnsPartitionCount(long partitionCount) + [InlineData(0L, true)] + [InlineData(0L, false)] + [InlineData(1L, true)] + [InlineData(1L, false)] + [InlineData(100L, true)] + [InlineData(100L, false)] + public async Task GetMetrics_ReturnsPartitionCount(long partitionCount, bool workloadIdentity) { _metricProviderMock.Setup(provider => provider.GetPartitionCountAsync(It.IsAny())).ReturnsAsync(partitionCount); - GetMetricsResponse response = await _cosmosDbScalerService.GetMetrics(GetGetMetricsRequest(), null); + GetMetricsResponse response = await _cosmosDbScalerService.GetMetrics(GetGetMetricsRequest(workloadIdentity), null); Assert.Single(response.MetricValues); @@ -82,14 +86,16 @@ public async Task GetMetrics_ReturnsPartitionCount(long partitionCount) } [Theory] - [InlineData("")] - [InlineData("custom-metric-name")] - public async Task GetMetrics_ReturnsSameMetricNameIfPassed(string requestMetricName) + [InlineData("", true)] + [InlineData("", false)] + [InlineData("custom-metric-name", true)] + [InlineData("custom-metric-name", false)] + public async Task GetMetrics_ReturnsSameMetricNameIfPassed(string requestMetricName, bool workloadIdentity) { _metricProviderMock.Setup(provider => provider.GetPartitionCountAsync(It.IsAny())).ReturnsAsync(1L); // No assertion with request.MetricName since it is ignored. - GetMetricsRequest request = GetGetMetricsRequest(); + GetMetricsRequest request = GetGetMetricsRequest(workloadIdentity); request.ScaledObjectRef.ScalerMetadata["metricName"] = requestMetricName; GetMetricsResponse response = await _cosmosDbScalerService.GetMetrics(request, null); @@ -99,10 +105,8 @@ public async Task GetMetrics_ReturnsSameMetricNameIfPassed(string requestMetricN } [Theory] - [InlineData("connection")] [InlineData("databaseId")] [InlineData("containerId")] - [InlineData("leaseConnection")] [InlineData("leaseDatabaseId")] [InlineData("leaseContainerId")] [InlineData("processorName")] @@ -112,10 +116,30 @@ await Assert.ThrowsAsync( () => _cosmosDbScalerService.GetMetricSpec(GetScaledObjectRefWithoutMetadata(metadataKey), null)); } - [Fact] - public async Task GetMetricSpec_ReturnsMetricSpec() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task GetMetricSpec_DoesNotThrowsOnOptionalMetadata(bool workloadIdentity) + { + await _cosmosDbScalerService.GetMetricSpec(GetScaledObjectRef(workloadIdentity), null); + } + + [Theory] + [InlineData("endpoint", "connection")] + [InlineData("leaseEndpoint", "leaseConnection")] + public async Task GetMetricSpec_ThrowsOnMissingConnections(string firstMetadataKey, string secondMetadataKey) + { + var exception = await Assert.ThrowsAnyAsync( + () => _cosmosDbScalerService.GetMetricSpec(GetScaledObjectRefWithoutMetadata(firstMetadataKey, secondMetadataKey), null)); + Assert.IsType(exception.InnerException); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task GetMetricSpec_ReturnsMetricSpec(bool workloadIdentity) { - GetMetricSpecResponse response = await _cosmosDbScalerService.GetMetricSpec(GetScaledObjectRef(), null); + GetMetricSpecResponse response = await _cosmosDbScalerService.GetMetricSpec(GetScaledObjectRef(workloadIdentity), null); Assert.Single(response.MetricSpecs); @@ -127,11 +151,13 @@ public async Task GetMetricSpec_ReturnsMetricSpec() } [Theory] - [InlineData("")] - [InlineData("custom-metric-name")] - public async Task GetMetricSpec_ReturnsSameMetricNameIfPassed(string requestMetricName) + [InlineData("", true)] + [InlineData("", false)] + [InlineData("custom-metric-name", true)] + [InlineData("custom-metric-name", false)] + public async Task GetMetricSpec_ReturnsSameMetricNameIfPassed(string requestMetricName, bool workloadIdentity) { - ScaledObjectRef request = GetScaledObjectRef(); + ScaledObjectRef request = GetScaledObjectRef(workloadIdentity); request.ScalerMetadata["metricName"] = requestMetricName; GetMetricSpecResponse response = await _cosmosDbScalerService.GetMetricSpec(request, null); @@ -140,11 +166,20 @@ public async Task GetMetricSpec_ReturnsSameMetricNameIfPassed(string requestMetr Assert.Equal(requestMetricName, response.MetricSpecs[0].MetricName); } - [Fact] - public async Task GetMetricSpec_ReturnsNormalizedMetricName() + [Theory] + [InlineData(true)] + [InlineData(false)] + public async Task GetMetricSpec_ReturnsNormalizedMetricName(bool workloadIdentity) { - ScaledObjectRef request = GetScaledObjectRef(); - request.ScalerMetadata["leaseConnection"] = "AccountEndpoint=https://example.com:443/;AccountKey=ZHVtbXky"; + ScaledObjectRef request = GetScaledObjectRef(workloadIdentity); + if (workloadIdentity) + { + request.ScalerMetadata["leaseEndpoint"] = "https://example.com:443"; + } + else + { + request.ScalerMetadata["leaseConnection"] = "AccountEndpoint=https://example.com:443/;AccountKey=ZHVtbXky"; + } request.ScalerMetadata["leaseDatabaseId"] = "Dummy.Lease.Database.Id"; request.ScalerMetadata["leaseContainerId"] = "Dummy:Lease:Container:Id"; request.ScalerMetadata["processorName"] = "Dummy%Processor%Name"; @@ -158,12 +193,12 @@ public async Task GetMetricSpec_ReturnsNormalizedMetricName() response.MetricSpecs[0].MetricName); } - private static GetMetricsRequest GetGetMetricsRequest() + private static GetMetricsRequest GetGetMetricsRequest(bool workloadIdentity) { return new GetMetricsRequest { MetricName = "dummy-metric-name", - ScaledObjectRef = GetScaledObjectRef(), + ScaledObjectRef = GetScaledObjectRef(workloadIdentity), }; } @@ -176,15 +211,20 @@ private static GetMetricsRequest GetGetMetricsRequestWithoutMetadata(string meta }; } - private static ScaledObjectRef GetScaledObjectRefWithoutMetadata(string metadataKey) + private static ScaledObjectRef GetScaledObjectRefWithoutMetadata(params string[] metadataKeys) { - var scaledObjectRef = GetScaledObjectRef(); - scaledObjectRef.ScalerMetadata.Remove(metadataKey); + var scaledObjectRef = GetScaledObjectRef(workloadIdentity: true); + // this is not technically correct but for sake of the test we need both connection and endpoint to be present + scaledObjectRef.ScalerMetadata["endpoint"] = "https://example1.com:443"; + scaledObjectRef.ScalerMetadata["leaseEndpoint"] = "https://example2.com:443"; + + foreach (string metadataKey in metadataKeys) + scaledObjectRef.ScalerMetadata.Remove(metadataKey); return scaledObjectRef; } - private static ScaledObjectRef GetScaledObjectRef() + private static ScaledObjectRef GetScaledObjectRef(bool workloadIdentity = false) { var scaledObjectRef = new ScaledObjectRef { @@ -194,10 +234,18 @@ private static ScaledObjectRef GetScaledObjectRef() MapField scalerMetadata = scaledObjectRef.ScalerMetadata; - scalerMetadata["connection"] = "AccountEndpoint=https://example1.com:443/;AccountKey=ZHVtbXkx"; + if (workloadIdentity) + { + scalerMetadata["endpoint"] = "https://example1.com:443"; + scalerMetadata["leaseEndpoint"] = "https://example2.com:443"; + } + else + { + scalerMetadata["connection"] = "AccountEndpoint=https://example1.com:443/;AccountKey=ZHVtbXkx"; + scalerMetadata["leaseConnection"] = "AccountEndpoint=https://example2.com:443/;AccountKey=ZHVtbXky"; + } scalerMetadata["databaseId"] = "dummy-database-id"; scalerMetadata["containerId"] = "dummy-container-id"; - scalerMetadata["leaseConnection"] = "AccountEndpoint=https://example2.com:443/;AccountKey=ZHVtbXky"; scalerMetadata["leaseDatabaseId"] = "dummy-lease-database-id"; scalerMetadata["leaseContainerId"] = "dummy-lease-container-id"; scalerMetadata["processorName"] = "dummy-processor-name"; diff --git a/src/Scaler/Services/CosmosDbMetricProvider.cs b/src/Scaler/Services/CosmosDbMetricProvider.cs index db9cd85..8ca242b 100644 --- a/src/Scaler/Services/CosmosDbMetricProvider.cs +++ b/src/Scaler/Services/CosmosDbMetricProvider.cs @@ -25,11 +25,11 @@ public async Task GetPartitionCountAsync(ScalerMetadata scalerMetadata) try { Container leaseContainer = _factory - .GetCosmosClient(scalerMetadata.LeaseConnection) + .GetCosmosClient(scalerMetadata.LeaseConnection ?? scalerMetadata.LeaseEndpoint) .GetContainer(scalerMetadata.LeaseDatabaseId, scalerMetadata.LeaseContainerId); ChangeFeedEstimator estimator = _factory - .GetCosmosClient(scalerMetadata.Connection) + .GetCosmosClient(scalerMetadata.Connection ?? scalerMetadata.Endpoint) .GetContainer(scalerMetadata.DatabaseId, scalerMetadata.ContainerId) .GetChangeFeedEstimator(scalerMetadata.ProcessorName, leaseContainer); diff --git a/src/Scaler/Services/ScalerMetadata.cs b/src/Scaler/Services/ScalerMetadata.cs index a2b6d74..05a03a1 100644 --- a/src/Scaler/Services/ScalerMetadata.cs +++ b/src/Scaler/Services/ScalerMetadata.cs @@ -1,5 +1,6 @@ using System; using System.Data.Common; +using System.Runtime.Serialization; using Newtonsoft.Json; namespace Keda.CosmosDb.Scaler @@ -9,10 +10,16 @@ internal sealed class ScalerMetadata { private string _metricName; + [JsonProperty(Required = Required.Default)] public string Connection { get; set; } + [JsonProperty(Required = Required.Default)] + public string Endpoint { get; set; } public string DatabaseId { get; set; } public string ContainerId { get; set; } + [JsonProperty(Required = Required.Default)] public string LeaseConnection { get; set; } + [JsonProperty(Required = Required.Default)] + public string LeaseEndpoint { get; set; } public string LeaseDatabaseId { get; set; } public string LeaseContainerId { get; set; } public string ProcessorName { get; set; } @@ -39,6 +46,10 @@ private string LeaseAccountHost { get { + if (string.IsNullOrEmpty(LeaseConnection)) + { + return new Uri(LeaseEndpoint).Host; + } var builder = new DbConnectionStringBuilder { ConnectionString = this.LeaseConnection }; return new Uri((string)builder["AccountEndpoint"]).Host; } @@ -48,5 +59,18 @@ public static ScalerMetadata Create(ScaledObjectRef scaledObjectRef) { return JsonConvert.DeserializeObject(scaledObjectRef.ScalerMetadata.ToString()); } + + [OnDeserialized] + internal void OnDeserializedMethod(StreamingContext context) + { + if (string.IsNullOrEmpty(LeaseConnection) && string.IsNullOrEmpty(LeaseEndpoint)) + { + throw new JsonSerializationException("Both LeaseConnection and LeaseEndpoint are missing."); + } + if(string.IsNullOrEmpty(Connection) && string.IsNullOrEmpty(Endpoint)) + { + throw new JsonSerializationException("Both Connection and Endpoint are missing."); + } + } } } From 3dc96e39daad46d31b12a994c9b72c4907faa4b5 Mon Sep 17 00:00:00 2001 From: Piotr Karpala Date: Wed, 12 Jun 2024 14:34:29 -0400 Subject: [PATCH 5/5] logging Signed-off-by: Piotr Karpala --- .gitignore | 1 + .vscode/launch.json | 25 +++++++++++++++++++ src/Scaler.Demo/OrderGenerator/Program.cs | 11 +++++--- src/Scaler.Demo/OrderProcessor/Worker.cs | 4 +-- .../CosmosDbScalerServiceTests.cs | 4 ++- src/Scaler/Services/CosmosDbMetricProvider.cs | 10 +++++++- src/Scaler/Services/CosmosDbScalerService.cs | 8 +++++- 7 files changed, 55 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 8afdcb6..ae8a90c 100644 --- a/.gitignore +++ b/.gitignore @@ -452,3 +452,4 @@ $RECYCLE.BIN/ !.vscode/tasks.json !.vscode/launch.json !.vscode/extensions.json +src/Scaler.Demo/OrderProcessor/appsettings.Development.json diff --git a/.vscode/launch.json b/.vscode/launch.json index 1da3396..41d4b7c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -30,6 +30,31 @@ "name": ".NET Core Attach", "type": "coreclr", "request": "attach" + }, + { + // Use IntelliSense to find out which attributes exist for C# debugging + // Use hover for the description of the existing attributes + // For further information visit https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md + "name": "Order Processor", + "type": "coreclr", + "request": "launch", + "preLaunchTask": "build", + // If you have changed target frameworks, make sure to update the program path. + "program": "${workspaceFolder}/src/Scaler.Demo/OrderProcessor/bin/Debug/net6.0/Keda.CosmosDb.Scaler.Demo.OrderProcessor.dll", + "args": [], + "cwd": "${workspaceFolder}/src/Scaler.Demo/OrderProcessor", + "stopAtEntry": false, + // Enable launching a web browser when ASP.NET Core starts. For more information: https://aka.ms/VSCode-CS-LaunchJson-WebBrowser + "serverReadyAction": { + "action": "openExternally", + "pattern": "\\bNow listening on:\\s+(https?://\\S+)" + }, + "env": { + "ASPNETCORE_ENVIRONMENT": "Development", + }, + "sourceFileMap": { + "/Views": "${workspaceFolder}/Views" + } } ] } \ No newline at end of file diff --git a/src/Scaler.Demo/OrderGenerator/Program.cs b/src/Scaler.Demo/OrderGenerator/Program.cs index 77bfac5..d473a7a 100644 --- a/src/Scaler.Demo/OrderGenerator/Program.cs +++ b/src/Scaler.Demo/OrderGenerator/Program.cs @@ -132,8 +132,11 @@ private static async Task CreateOrderAsync(Container container, string article) private static async Task SetupAsync() { Console.WriteLine($"Creating database: {_cosmosDbConfig.DatabaseId}"); + using var cosmosClient = _cosmosDbConfig.Connection.Contains("AccountKey") + ? new CosmosClient(_cosmosDbConfig.Connection, new CosmosClientOptions { ConnectionMode = ConnectionMode.Gateway }) + : new CosmosClient(_cosmosDbConfig.Connection, new DefaultAzureCredential(), new CosmosClientOptions { ConnectionMode = ConnectionMode.Direct }); - Database database = await new CosmosClient(_cosmosDbConfig.Connection) + Database database = await cosmosClient .CreateDatabaseIfNotExistsAsync(_cosmosDbConfig.DatabaseId); Console.WriteLine($"Creating container: {_cosmosDbConfig.ContainerId} with throughput: {_cosmosDbConfig.ContainerThroughput} RU/s"); @@ -147,12 +150,14 @@ await database.CreateContainerIfNotExistsAsync( private static async Task TeardownAsync() { - var client = new CosmosClient(_cosmosDbConfig.Connection); + using var cosmosClient = _cosmosDbConfig.Connection.Contains("AccountKey") + ? new CosmosClient(_cosmosDbConfig.Connection, new CosmosClientOptions { ConnectionMode = ConnectionMode.Gateway }) + : new CosmosClient(_cosmosDbConfig.Connection, new DefaultAzureCredential(), new CosmosClientOptions { ConnectionMode = ConnectionMode.Direct }); try { Console.WriteLine($"Deleting database: {_cosmosDbConfig.DatabaseId}"); - await client.GetDatabase(_cosmosDbConfig.DatabaseId).DeleteAsync(); + await cosmosClient.GetDatabase(_cosmosDbConfig.DatabaseId).DeleteAsync(); } catch (CosmosException) { diff --git a/src/Scaler.Demo/OrderProcessor/Worker.cs b/src/Scaler.Demo/OrderProcessor/Worker.cs index eb2844a..17eec0c 100644 --- a/src/Scaler.Demo/OrderProcessor/Worker.cs +++ b/src/Scaler.Demo/OrderProcessor/Worker.cs @@ -30,7 +30,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) ? new CosmosClient(_cosmosDbConfig.Connection) : new CosmosClient(_cosmosDbConfig.Connection, new DefaultAzureCredential()); - Database leaseDatabase = await new CosmosClient(_cosmosDbConfig.LeaseConnection) + Database leaseDatabase = await cosmosClient .CreateDatabaseIfNotExistsAsync(_cosmosDbConfig.LeaseDatabaseId, cancellationToken: cancellationToken); Container leaseContainer = await leaseDatabase @@ -42,7 +42,7 @@ public override async Task StartAsync(CancellationToken cancellationToken) // Change feed processor instance name should be unique for each container application. string instanceName = $"Instance-{Dns.GetHostName()}"; - _processor = new CosmosClient(_cosmosDbConfig.Connection) + _processor = cosmosClient .GetContainer(_cosmosDbConfig.DatabaseId, _cosmosDbConfig.ContainerId) .GetChangeFeedProcessorBuilder(_cosmosDbConfig.ProcessorName, ProcessOrdersAsync) .WithInstanceName(instanceName) diff --git a/src/Scaler.Tests/CosmosDbScalerServiceTests.cs b/src/Scaler.Tests/CosmosDbScalerServiceTests.cs index 2156b0e..58ade43 100644 --- a/src/Scaler.Tests/CosmosDbScalerServiceTests.cs +++ b/src/Scaler.Tests/CosmosDbScalerServiceTests.cs @@ -1,6 +1,7 @@ using System; using System.Threading.Tasks; using Google.Protobuf.Collections; +using Microsoft.Extensions.Logging; using Moq; using Newtonsoft.Json; using Xunit; @@ -15,7 +16,8 @@ public class CosmosDbScalerServiceTests public CosmosDbScalerServiceTests() { _metricProviderMock = new Mock(); - _cosmosDbScalerService = new CosmosDbScalerService(_metricProviderMock.Object); + var _loggerMock = new Mock>(); + _cosmosDbScalerService = new CosmosDbScalerService(_metricProviderMock.Object, _loggerMock.Object); } [Theory] diff --git a/src/Scaler/Services/CosmosDbMetricProvider.cs b/src/Scaler/Services/CosmosDbMetricProvider.cs index 8ca242b..355110a 100644 --- a/src/Scaler/Services/CosmosDbMetricProvider.cs +++ b/src/Scaler/Services/CosmosDbMetricProvider.cs @@ -41,10 +41,18 @@ public async Task GetPartitionCountAsync(ScalerMetadata scalerMetadata) while (iterator.HasMoreResults) { FeedResponse states = await iterator.ReadNextAsync(); - partitionCount += states.Where(state => state.EstimatedLag > 0).Count(); + + foreach (ChangeFeedProcessorState leaseState in states) + { + string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}"; + _logger.LogInformation("Lease [{LeaseToken}] {host} reports {EstimatedLag} as estimated lag.", leaseState.LeaseToken, host, leaseState.EstimatedLag); + + partitionCount += leaseState.EstimatedLag > 0 ? 1 : 0; + } } } + _logger.LogInformation("Returning active {partitionCount}", partitionCount); return partitionCount; } catch (CosmosException exception) diff --git a/src/Scaler/Services/CosmosDbScalerService.cs b/src/Scaler/Services/CosmosDbScalerService.cs index 0ce6c69..dc7950c 100644 --- a/src/Scaler/Services/CosmosDbScalerService.cs +++ b/src/Scaler/Services/CosmosDbScalerService.cs @@ -8,10 +8,12 @@ namespace Keda.CosmosDb.Scaler internal sealed class CosmosDbScalerService : ExternalScaler.ExternalScalerBase { private readonly ICosmosDbMetricProvider _metricProvider; + private readonly ILogger _logger; - public CosmosDbScalerService(ICosmosDbMetricProvider metricProvider) + public CosmosDbScalerService(ICosmosDbMetricProvider metricProvider, ILogger logger) { _metricProvider = metricProvider ?? throw new ArgumentNullException(nameof(metricProvider)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); } public override async Task IsActive(ScaledObjectRef request, ServerCallContext context) @@ -19,6 +21,8 @@ public override async Task IsActive(ScaledObjectRef request, S var scalerMetadata = ScalerMetadata.Create(request); bool isActive = (await _metricProvider.GetPartitionCountAsync(scalerMetadata)) > 0L; + + _logger.LogInformation("Scaler is {status}", isActive ? "active" : "inactive"); return new IsActiveResponse { Result = isActive }; } @@ -34,6 +38,7 @@ public override async Task GetMetrics(GetMetricsRequest requ MetricValue_ = await _metricProvider.GetPartitionCountAsync(scalerMetadata), }); + _logger.LogInformation("Returning metric value {value} for metric {metric}", response.MetricValues[0].MetricValue_, response.MetricValues[0].MetricName); return response; } @@ -49,6 +54,7 @@ public override Task GetMetricSpec(ScaledObjectRef reques TargetSize = 1L, }); + _logger.LogInformation("Returning target size {size} for metric {metric}", response.MetricSpecs[0].TargetSize, response.MetricSpecs[0].MetricName); return Task.FromResult(response); } }