Skip to content

Commit

Permalink
Validation and health check changes. (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
chullybun authored Apr 9, 2024
1 parent 929adec commit 7d9737d
Show file tree
Hide file tree
Showing 37 changed files with 560 additions and 454 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

Represents the **NuGet** versions.

## v3.17.0
- *Enhancement*: Additional `CoreEx.Validation` usability improvements:
- `Validator.CreateFor<T>` added to enable the creation of a `CommonValidator<T>` instance for a specified type `T` (more purposeful name); synonym for existing `CommonValidator.Create<T>` (unchanged).
- `Validator.Null<T>` added to enable simplified specification of a `IValidatorEx<T>` of `null` to avoid explicit `null` casting.
- `Collection` extension method has additional overload to pass in the `IValidatorEx<TItem>` to use for each item in the collection; versus, having to use `CollectionRuleItem.Create`.
- `Dictionary` extension method has additional overload to pass in the `IValidatorEx<TKey>` and `IValidator<TValue>` to use for each entry in the dictionary; versus, having to use `DictionaryRuleItem.Create`.
- `MinimumCount` and `MaximumCount` extension methods for `ICollection` added to enable explicit specification of these two basic validations.
- `Validation.CreateCollection` renamed to `Validation.CreateForCollection` and creates a `CommonValidator<TColl>`.
- Existing `CollectionValidator` deprecated as the `CommonValidator<TColl>` offers same; removes duplication of capability.
- `Validation.CreateDictionary` renamed to `Validation.CreateForDictionary` and creates a `CommonValidator<TDict>`.
- Existing `DictionaryValidator` deprecated as the `CommonValidator<TDict>` offers same; removes duplication of capability.
- *Enhancement*: Added `ServiceBusReceiverHealthCheck` to perform a peek message on the `ServiceBusReceiver` as a means to determine health. Use `IHealthChecksBuilder.AddServiceBusReceiverHealthCheck` to configure.
- *Fixed:* The `FileLockSynchronizer`, `BlobLeaseSynchronizer` and `TableWorkStatePersistence` have had any file/blob/table validations/manipulations moved from the constructor to limit critical failures at startup from a DI perspective; now only performed where required/used. This also allows improved health check opportunities as means to verify.

## v3.16.0
- *Enhancement*: Added basic [FluentValidator](https://docs.fluentvalidation.net/en/latest/) compatibility to the `CoreEx.Validation` by supporting _key_ (common) named capabilities:
- `AbstractValidator<T>` added as a wrapper for `Validator<T>`; with both supporting `RuleFor` method (wrapper for existing `Property`).
Expand Down
2 changes: 1 addition & 1 deletion Common.targets
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>3.16.0</Version>
<Version>3.17.0</Version>
<LangVersion>preview</LangVersion>
<Authors>Avanade</Authors>
<Company>Avanade</Company>
Expand Down
7 changes: 5 additions & 2 deletions samples/My.Hr/My.Hr.Api/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
using OpenTelemetry.Trace;
using Az = Azure.Messaging.ServiceBus;
using CoreEx.Database.HealthChecks;
using CoreEx.Azure.ServiceBus.HealthChecks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.DependencyInjection;

namespace My.Hr.Api;

Expand Down Expand Up @@ -51,8 +54,8 @@ public void ConfigureServices(IServiceCollection services)

// Register the health checks.
services
.AddHealthChecks();
//.AddTypeActivatedCheck<AzureServiceBusQueueHealthCheck>("Verification Queue", HealthStatus.Unhealthy, nameof(HrSettings.ServiceBusConnection), nameof(HrSettings.VerificationQueueName))
.AddHealthChecks()
.AddServiceBusReceiverHealthCheck(sp => sp.GetRequiredService<Az.ServiceBusClient>().CreateReceiver(sp.GetRequiredService<HrSettings>().VerificationQueueName), "verification-queue");

services.AddControllers();

Expand Down
2 changes: 1 addition & 1 deletion samples/My.Hr/My.Hr.Api/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
}
},
"AllowedHosts": "*",
"VerificationQueueName": "pendingVerifications",
"VerificationQueueName": "verification-queue",
"ServiceBusConnection": "coreex.servicebus.windows.net",
"ConnectionStrings": {
"Database": "Data Source=.;Initial Catalog=My.HrDb;Integrated Security=True;TrustServerCertificate=true"
Expand Down
3 changes: 3 additions & 0 deletions samples/My.Hr/My.Hr.Api/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,8 @@
"AbsoluteExpirationRelativeToNow": "03:00:00",
"SlidingExpiration": "00:45:00"
}
},
"ServiceBusConnection": {
"fullyQualifiedNamespace": "Endpoint=sb://top-secret.servicebus.windows.net/;SharedAccessKeyName=top-secret;SharedAccessKey=top-encrypted-secret"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) Avanade. Licensed under the MIT License. See https://github.com/Avanade/CoreEx

using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace CoreEx.Azure.ServiceBus.HealthChecks
{
/// <summary>
/// Provides a <see cref="ServiceBusReceiver"/> <see cref="IHealthCheck"/> to verify the receiver is accessible by peeking a message.
/// </summary>
/// <param name="receiverFactory">The <see cref="ServiceBusReceiver"/> create factory.</param>
public class ServiceBusReceiverHealthCheck(Func<ServiceBusReceiver> receiverFactory) : IHealthCheck
{
private readonly Func<ServiceBusReceiver> _receiverFactory = receiverFactory.ThrowIfNull(nameof(receiverFactory));

/// <inheritdoc/>
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
await using var receiver = _receiverFactory() ?? throw new InvalidOperationException("The ServiceBusReceiver factory returned null.");
var msg = await receiver.PeekMessageAsync(null, cancellationToken).ConfigureAwait(false);
return HealthCheckResult.Healthy(null, new Dictionary<string, object>{ { "message", msg is null ? "none" : new Message { MessageId = msg.MessageId, CorrelationId = msg.CorrelationId, Subject = msg.Subject, SessionId = msg.SessionId, PartitionKey = msg.PartitionKey } } });
}

private class Message
{
public string? MessageId { get; set; }
public string? CorrelationId { get; set; }
public string? Subject { get; set; }
public string? SessionId { get; set; }
public string? PartitionKey { get; set; }
}
}
}
22 changes: 22 additions & 0 deletions src/CoreEx.Azure/ServiceBus/IServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

using CoreEx;
using CoreEx.Azure.ServiceBus;
using CoreEx.Azure.ServiceBus.HealthChecks;
using CoreEx.Configuration;
using CoreEx.Events;
using CoreEx.Events.Subscribing;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using Asb = Azure.Messaging.ServiceBus;

namespace Microsoft.Extensions.DependencyInjection
Expand Down Expand Up @@ -75,5 +78,24 @@ public static IServiceCollection AddAzureServiceBusPurger(this IServiceCollectio
configure?.Invoke(sp, sbp);
return sbp;
});

/// <summary>
/// Adds a <see cref="ServiceBusReceiverHealthCheck"/> that will peek a message from the Azure Service Bus receiver to confirm health.
/// </summary>
/// <param name="builder">The <see cref="IHealthChecksBuilder"/>.</param>
/// <param name="name">The health check name. Defaults to '<c>azure-service-bus-receiver</c>'.</param>
/// <param name="serviceBusReceiverFactory">The <see cref="Asb.ServiceBusReceiver"/> factory.</param>
/// <param name="failureStatus">The <see cref="HealthStatus"/> that should be reported when the health check reports a failure. If the provided value is <c>null</c>, then <see cref="HealthStatus.Unhealthy"/> will be reported.</param>
/// <param name="tags">A list of tags that can be used for filtering health checks.</param>
/// <param name="timeout">An optional <see cref="TimeSpan"/> representing the timeout of the check.</param>
public static IHealthChecksBuilder AddServiceBusReceiverHealthCheck(this IHealthChecksBuilder builder, Func<IServiceProvider, Asb.ServiceBusReceiver> serviceBusReceiverFactory, string? name = null, HealthStatus? failureStatus = default, IEnumerable<string>? tags = default, TimeSpan? timeout = default)
{
serviceBusReceiverFactory.ThrowIfNull(nameof(serviceBusReceiverFactory));

return builder.Add(new HealthCheckRegistration(name ?? "azure-service-bus-receiver", sp =>
{
return new ServiceBusReceiverHealthCheck(() => serviceBusReceiverFactory(sp));
}, failureStatus, tags, timeout));
}
}
}
4 changes: 2 additions & 2 deletions src/CoreEx.Azure/Storage/BlobLeaseSynchronizer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public class BlobLeaseSynchronizer : IServiceSynchronizer
public BlobLeaseSynchronizer(BlobContainerClient client)
{
_client = client.ThrowIfNull(nameof(client));
_client.CreateIfNotExists();

_timer = new Lazy<Timer>(() => new Timer(_ =>
{
foreach (var kvp in _dict.ToArray())
Expand Down Expand Up @@ -74,6 +72,8 @@ public bool Enter<T>(string? name = null)

_dict.GetOrAdd(GetName<T>(name), fn =>
{
_client.CreateIfNotExists();

var blob = _client.GetBlobClient(GetName<T>(name));
try
{
Expand Down
47 changes: 43 additions & 4 deletions src/CoreEx.Azure/Storage/TableWorkStatePersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public class TableWorkStatePersistence : IWorkStatePersistence
private readonly TableClient _workStateTableClient;
private readonly TableClient _workDataTableClient;
private readonly IJsonSerializer _jsonSerializer;
private readonly SemaphoreSlim _semaphore = new(1, 1);
private volatile bool _firstTime = true;

/// <summary>
/// Initializes a new instance of the <see cref="TableWorkStatePersistence"/> class.
Expand Down Expand Up @@ -84,6 +86,7 @@ private class WorkDataEntity() : ITableEntity
private const int _maxChunks = 15;
private const int _maxSize = _chunkSize * _maxChunks;
private readonly BinaryData?[] _data = new BinaryData?[_maxChunks];

public WorkDataEntity(BinaryData data) : this()
{
var arr = data.ToArray();
Expand All @@ -101,6 +104,7 @@ public WorkDataEntity(BinaryData data) : this()
_data[i++] = BinaryData.FromBytes(chunk);
}
}

public string PartitionKey { get; set; } = GetPartitionKey();
public string RowKey { get; set; } = null!;
public DateTimeOffset? Timestamp { get; set; }
Expand Down Expand Up @@ -149,9 +153,35 @@ public WorkDataEntity(BinaryData data) : this()
/// </summary>
private static string GetPartitionKey() => (ExecutionContext.HasCurrent ? ExecutionContext.Current.TenantId : null) ?? "default";

/// <summary>
/// Creates the tables if they do not already exist.
/// </summary>
private async Task CreateIfNotExistsAsync(CancellationToken cancellationToken)
{
if (_firstTime)
{
await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
if (_firstTime)
{
await _workDataTableClient.CreateIfNotExistsAsync(cancellationToken).ConfigureAwait(false);
await _workStateTableClient.CreateIfNotExistsAsync(cancellationToken).ConfigureAwait(false);
_firstTime = false;
}
}
finally
{
_semaphore.Release();
}
}
}

/// <inheritdoc/>
public async Task<WorkState?> GetAsync(string id, CancellationToken cancellationToken)
{
await CreateIfNotExistsAsync(cancellationToken).ConfigureAwait(false);

var er = await _workStateTableClient.GetEntityIfExistsAsync<WorkStateEntity>(GetPartitionKey(), id, cancellationToken: cancellationToken).ConfigureAwait(false);
if (!er.HasValue)
return null;
Expand Down Expand Up @@ -182,25 +212,34 @@ public WorkDataEntity(BinaryData data) : this()
/// <summary>
/// Performs an upsert (create/update).
/// </summary>
private async Task UpsertAsync(WorkState state, CancellationToken cancellationToken)
=> await _workStateTableClient.UpsertEntityAsync(new WorkStateEntity(state), TableUpdateMode.Replace, cancellationToken).ConfigureAwait(false);
private async Task UpsertAsync(WorkState state, CancellationToken cancellationToken)
{
await CreateIfNotExistsAsync(cancellationToken).ConfigureAwait(false);
await _workStateTableClient.UpsertEntityAsync(new WorkStateEntity(state), TableUpdateMode.Replace, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public async Task DeleteAsync(string id, CancellationToken cancellationToken)
{
await CreateIfNotExistsAsync(cancellationToken).ConfigureAwait(false);
await _workDataTableClient.DeleteEntityAsync(GetPartitionKey(), id, cancellationToken: cancellationToken).ConfigureAwait(false);
await _workStateTableClient.DeleteEntityAsync(GetPartitionKey(), id, cancellationToken: cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public async Task<BinaryData?> GetDataAsync(string id, CancellationToken cancellationToken)
{
await CreateIfNotExistsAsync(cancellationToken).ConfigureAwait(false);

var er = await _workDataTableClient.GetEntityIfExistsAsync<WorkDataEntity>(GetPartitionKey(), id, cancellationToken: cancellationToken).ConfigureAwait(false);
return er.HasValue ? er.Value!.ToSingleData() : null;
}

/// <inheritdoc/>
public Task SetDataAsync(string id, BinaryData data, CancellationToken cancellationToken)
=> _workDataTableClient.UpsertEntityAsync(new WorkDataEntity(data) { PartitionKey = GetPartitionKey(), RowKey = id }, TableUpdateMode.Replace, cancellationToken: cancellationToken);
public async Task SetDataAsync(string id, BinaryData data, CancellationToken cancellationToken)
{
await CreateIfNotExistsAsync(cancellationToken).ConfigureAwait(false);
await _workDataTableClient.UpsertEntityAsync(new WorkDataEntity(data) { PartitionKey = GetPartitionKey(), RowKey = id }, TableUpdateMode.Replace, cancellationToken: cancellationToken).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static IServiceCollection AddSqlServerEventOutboxHostedService(this IServ
var hc = healthCheck ? new TimerHostedServiceHealthCheck() : null;
if (hc is not null)
{
var sb = new StringBuilder("EventOutbox");
var sb = new StringBuilder("sql-server-event-outbox");
if (partitionKey is not null)
sb.Append($"-PartitionKey-{partitionKey}");

Expand Down
34 changes: 30 additions & 4 deletions src/CoreEx.Database/DatabaseServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,20 @@ public static class DatabaseServiceCollectionExtensions
public static IServiceCollection AddDatabase(this IServiceCollection services, Func<IServiceProvider, IDatabase> create, bool healthCheck = true)
{
services.AddScoped(sp => create(sp) ?? throw new InvalidOperationException($"An {nameof(IDatabase)} instance must be instantiated."));
return AddHealthCheck(services, healthCheck);
return AddHealthCheck(services, healthCheck, null);
}

/// <summary>
/// Adds an <see cref="IDatabase"/> as a scoped service including a corresponding health check.
/// </summary>
/// <param name="services">The <see cref="IServiceCollection"/>.</param>
/// <param name="create">The function to create the <see cref="IDatabase"/> instance.</param>
/// <param name="healthCheckName">The health check name; defaults to '<c>database</c>'.</param>
/// <returns>The <see cref="IServiceCollection"/> to support fluent-style method-chaining.</returns>
public static IServiceCollection AddDatabase(this IServiceCollection services, Func<IServiceProvider, IDatabase> create, string? healthCheckName)
{
services.AddScoped(sp => create(sp) ?? throw new InvalidOperationException($"An {nameof(IDatabase)} instance must be instantiated."));
return AddHealthCheck(services, true, healthCheckName);
}

/// <summary>
Expand All @@ -35,16 +48,29 @@ public static IServiceCollection AddDatabase(this IServiceCollection services, F
public static IServiceCollection AddDatabase<TDb>(this IServiceCollection services, bool healthCheck = true) where TDb : class, IDatabase
{
services.AddScoped<IDatabase, TDb>();
return AddHealthCheck(services, healthCheck);
return AddHealthCheck(services, healthCheck, null);
}

/// <summary>
/// Adds an <see cref="IDatabase"/> as a scoped service including a corresponding health check.
/// </summary>
/// <typeparam name="TDb">The <see cref="IDatabase"/> <see cref="Type"/>.</typeparam>
/// <param name="services">The <see cref="IServiceCollection"/>.</param>
/// <param name="healthCheckName">The health check name; defaults to '<c>database</c>'.</param>
/// <returns>The <see cref="IServiceCollection"/> to support fluent-style method-chaining.</returns>
public static IServiceCollection AddDatabase<TDb>(this IServiceCollection services, string? healthCheckName) where TDb : class, IDatabase
{
services.AddScoped<IDatabase, TDb>();
return AddHealthCheck(services, true, healthCheckName);
}

/// <summary>
/// Adds the <see cref="DatabaseHealthCheck{TDatabase}"/> where configured to do so.
/// </summary>
private static IServiceCollection AddHealthCheck(this IServiceCollection services, bool healthCheck)
private static IServiceCollection AddHealthCheck(this IServiceCollection services, bool healthCheck, string? healthCheckName)
{
if (healthCheck)
services.AddHealthChecks().AddTypeActivatedCheck<DatabaseHealthCheck<IDatabase>>("Database", HealthStatus.Unhealthy, tags: default!, timeout: TimeSpan.FromSeconds(30));
services.AddHealthChecks().AddTypeActivatedCheck<DatabaseHealthCheck<IDatabase>>(healthCheckName ?? "database", HealthStatus.Unhealthy, tags: default!, timeout: TimeSpan.FromSeconds(30));

return services;
}
Expand Down
Loading

0 comments on commit 7d9737d

Please sign in to comment.