Skip to content

Commit

Permalink
temp fix for multi-tenancy + health checks. Closes GH-3352
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Aug 4, 2024
1 parent e4df230 commit 504f036
Showing 1 changed file with 69 additions and 37 deletions.
106 changes: 69 additions & 37 deletions src/Marten.AspNetCore/Daemon/AsyncDaemonHealthCheckExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#nullable enable
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
Expand All @@ -17,20 +16,32 @@ namespace Marten.Events.Daemon;
public static class AsyncDaemonHealthCheckExtensions
{
/// <summary>
/// Adds a health check for Martens Async Daemon.
/// The health check will verify that no async projection progression is lagging behind more than the <paramref name="maxEventLag"/>
/// The check will return <see cref="HealthCheckResult.Unhealthy"/> if any progression is more than <paramref name="maxEventLag"/> behind the highWaterMark OR if any exception is thrown while doing the check.
/// <example>
/// <code>
/// Adds a health check for Martens Async Daemon.
/// The health check will verify that no async projection progression is lagging behind more than the
/// <paramref name="maxEventLag" />
/// The check will return <see cref="HealthCheckResult.Unhealthy" /> if any progression is more than
/// <paramref name="maxEventLag" /> behind the highWaterMark OR if any exception is thrown while doing the check.
/// <example>
/// <code>
/// Customized Injection Example: services.AddHealthChecks().AddAsyncDaemonHealthCheck(150);
/// </code>
/// </example>
/// Also - remember to add <c>app.MapHealthChecks("/your-health-path")</c> to the middleware pipeline
/// </example>
/// Also - remember to add <c>app.MapHealthChecks("/your-health-path")</c> to the middleware pipeline
/// </summary>
/// <param name="builder"><see cref="IHealthChecksBuilder"/></param>
/// <param name="maxEventLag">(optional) Acceptable lag of an eventprojection before it's considered unhealthy - defaults to 100</param>
/// <param name="maxSameLagTime">(optional) Treat projection as healthy if maxEventLag exceeded, but projection sequence changed since last check in given time - defaults to null (uses just maxEventLag)</param>
/// <returns>If healthy: <see cref="HealthCheckResult.Healthy"/> - else <see cref="HealthCheckResult.Unhealthy"/></returns>
/// <param name="builder">
/// <see cref="IHealthChecksBuilder" />
/// </param>
/// <param name="maxEventLag">
/// (optional) Acceptable lag of an eventprojection before it's considered unhealthy - defaults
/// to 100
/// </param>
/// <param name="maxSameLagTime">
/// (optional) Treat projection as healthy if maxEventLag exceeded, but projection sequence
/// changed since last check in given time - defaults to null (uses just maxEventLag)
/// </param>
/// <returns>
/// If healthy: <see cref="HealthCheckResult.Healthy" /> - else <see cref="HealthCheckResult.Unhealthy" />
/// </returns>
public static IHealthChecksBuilder AddMartenAsyncDaemonHealthCheck(
this IHealthChecksBuilder builder,
int maxEventLag = 100,
Expand All @@ -46,39 +57,39 @@ public static IHealthChecksBuilder AddMartenAsyncDaemonHealthCheck(
}

/// <summary>
/// Internal class used to DI settings to async daemon health check
/// Internal class used to DI settings to async daemon health check
/// </summary>
/// <param name="MaxEventLag"></param>
/// <returns></returns>
internal record AsyncDaemonHealthCheckSettings(int MaxEventLag, TimeSpan? MaxSameLagTime = null);

/// <summary>
/// Health check implementation
/// Health check implementation
/// </summary>
internal class AsyncDaemonHealthCheck: IHealthCheck
{
/// <summary>
/// The <see cref="DocumentStore"/> to check health for.
/// </summary>
private readonly IDocumentStore _store;
private readonly ConcurrentDictionary<string, (DateTime CheckedAt, long Sequence)>
_lastProjectionsChecks = new();

/// <summary>
/// The allowed event projection processing lag compared to the HighWaterMark.
/// The allowed event projection processing lag compared to the HighWaterMark.
/// </summary>
private readonly int _maxEventLag;

/// <summary>
/// The allowed time for event projection is lagging (by maxEventLag).
/// If not provided every projection is considered lagging if HighWaterMark - projection.Position >= maxEventLag.
/// If provided only if projection.Position is still the same for given time.
/// When you want to rely only on time just set _maxEventLag=1 and maxSameLagTime to desired value.
/// The allowed time for event projection is lagging (by maxEventLag).
/// If not provided every projection is considered lagging if HighWaterMark - projection.Position >= maxEventLag.
/// If provided only if projection.Position is still the same for given time.
/// When you want to rely only on time just set _maxEventLag=1 and maxSameLagTime to desired value.
/// </summary>
private readonly TimeSpan? _maxSameLagTime;

private readonly TimeProvider _timeProvider;
/// <summary>
/// The <see cref="DocumentStore" /> to check health for.
/// </summary>
private readonly IDocumentStore _store;

private readonly ConcurrentDictionary<string, (DateTime CheckedAt, long Sequence)>
_lastProjectionsChecks = new();
private readonly TimeProvider _timeProvider;

public AsyncDaemonHealthCheck(IDocumentStore store, AsyncDaemonHealthCheckSettings settings,
TimeProvider timeProvider)
Expand All @@ -89,7 +100,7 @@ public AsyncDaemonHealthCheck(IDocumentStore store, AsyncDaemonHealthCheckSettin
_maxSameLagTime = settings.MaxSameLagTime;
}

public Task<HealthCheckResult> CheckHealthAsync(
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default
)
Expand All @@ -101,26 +112,47 @@ public Task<HealthCheckResult> CheckHealthAsync(
.Select(x => $"{x.ProjectionName}:All")
.ToHashSet();

var database = _store.Storage.Database;
var databases = await _store.Storage.AllDatabases().ConfigureAwait(false);

if (databases.Count == 1)
{
return await CheckProjectionHealthAsync(databases[0], projectionsToCheck, cancellationToken)
.ConfigureAwait(false);
}
else
{
// Cheesy, but just returning unhealthy
foreach (var database in databases)
{
var healthCheck =
await CheckProjectionHealthAsync(database, projectionsToCheck, cancellationToken)
.ConfigureAwait(false);
if (healthCheck.Status != HealthStatus.Healthy)
{
return healthCheck;
}
}
}

return CheckProjectionHealthAsync(database, projectionsToCheck, cancellationToken);
return HealthCheckResult.Healthy("Healthy");
}
catch (Exception ex)
{
return Task.FromResult(HealthCheckResult.Unhealthy($"Unhealthy: {ex.Message}", ex));
return HealthCheckResult.Unhealthy($"Unhealthy: {ex.Message}", ex);
}
}

public async Task<HealthCheckResult> CheckProjectionHealthAsync(IMartenDatabase database, HashSet<string> projectionsToCheck,
public async Task<HealthCheckResult> CheckProjectionHealthAsync(IMartenDatabase database,
HashSet<string> projectionsToCheck,
CancellationToken cancellationToken)
{
var allProgress = await database.AllProjectionProgress(token: cancellationToken)
var allProgress = await database.AllProjectionProgress(cancellationToken)
.ConfigureAwait(true);

var highWaterMark = allProgress.FirstOrDefault(x => string.Equals("HighWaterMark", x.ShardName));
if (highWaterMark is null)
{
return Microsoft.Extensions.Diagnostics.HealthChecks.HealthCheckResult.Healthy("Healthy");
return HealthCheckResult.Healthy("Healthy");
}

var projectionMarks = allProgress.Where(x => !string.Equals("HighWaterMark", x.ShardName)).ToArray();
Expand All @@ -136,10 +168,10 @@ public async Task<HealthCheckResult> CheckProjectionHealthAsync(IMartenDatabase
if (_maxSameLagTime is null)
{
return laggingProjections.Any()
? Microsoft.Extensions.Diagnostics.HealthChecks.HealthCheckResult.Unhealthy(
? HealthCheckResult.Unhealthy(
$"Unhealthy: Async projection sequence is more than {_maxEventLag} events behind for projection(s): {laggingProjections.Select(x => x.ShardName).Join(", ")}"
)
: Microsoft.Extensions.Diagnostics.HealthChecks.HealthCheckResult.Healthy("Healthy");
: HealthCheckResult.Healthy("Healthy");
}

var now = _timeProvider.GetUtcNow().UtcDateTime;
Expand Down Expand Up @@ -169,10 +201,10 @@ public async Task<HealthCheckResult> CheckProjectionHealthAsync(IMartenDatabase
}

return projectionsLaggingWithSamePositionForGivenTime.Any()
? Microsoft.Extensions.Diagnostics.HealthChecks.HealthCheckResult.Unhealthy(
? HealthCheckResult.Unhealthy(
$"Unhealthy: Async projection sequence is more than {_maxEventLag} events behind with same sequence for more than {_maxSameLagTime} for projection(s): {projectionsLaggingWithSamePositionForGivenTime.Select(x => x.ShardName).Join(", ")}"
)
: Microsoft.Extensions.Diagnostics.HealthChecks.HealthCheckResult.Healthy("Healthy");
: HealthCheckResult.Healthy("Healthy");
}
}
}

0 comments on commit 504f036

Please sign in to comment.