Skip to content

Commit

Permalink
Adding new metrics and activity emitting for the high water mark. Closes
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Nov 19, 2024
1 parent bc37a05 commit 9f84dda
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 7 deletions.
21 changes: 21 additions & 0 deletions docs/events/projections/async-daemon.md
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,24 @@ The `gap` metrics are a good health check on the performance of any given projec
is growing, that's a sign that your projection or subscription isn't being able to keep up with the incoming
events
:::

## High Water Mark <Badge type="tip" text="7.33" />

One of the possible issues in Marten operation is "event skipping" in the async daemon where the high water mark
detection grows "stale" because of gaps in the event sequence (generally caused by either very slow outstanding transactions or errors)
and Marten emits an error message like this in the log file:

*"High Water agent is stale after threshold of {DelayInSeconds} seconds, skipping gap to events marked after {SafeHarborTime} for database {Name}"*

With the recent prevalence of [Open Telemetry](https://opentelemetry.io/) tooling in the software industry, Marten
is now emitting Open Telemetry spans and metrics around the high water mark detection in the async daemon.

First off, Marten is emitting spans named either `marten.daemon.highwatermark` in the case of
only targeting a single database, or `marten.[database name].daemon.highwatermark` in the case of
using multi-tenancy through a database per tenant. On these spans will be these tags:

* `sequence` -- the largest event sequence that has been assigned to the database at this point
* `status` -- either `CaughtUp`, `Changed`, or `Stale` meaning "all good", "proceeding normally", or "uh, oh, something is up with outstanding transactions"
* `current.mark` -- the current, detected "high water mark" where Marten says is the ceiling on where events can be safely processed
* `skipped` -- this tag will only be present as a "true" value if Marten is forcing the high water detection to skip stale gaps in the event sequence
* `last.mark` -- if skipping event sequences, this will be the last good mark before the high water detection calculated the skip
42 changes: 40 additions & 2 deletions src/Marten/Events/Daemon/HighWater/HighWaterAgent.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
using System;
using System.Diagnostics;
using System.Diagnostics.Metrics;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using JasperFx.Core;
using Marten.Internal.OpenTelemetry;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Trace;
using Timer = System.Timers.Timer;

namespace Marten.Events.Daemon.HighWater;
Expand All @@ -19,9 +23,12 @@ internal class HighWaterAgent: IDisposable
private HighWaterStatistics _current;
private Task<Task> _loop;
private CancellationToken _token;
private readonly string _spanName;
private readonly Counter<int> _skipping;

// ReSharper disable once ContextualLoggerProblem
public HighWaterAgent(IHighWaterDetector detector, ShardStateTracker tracker, ILogger logger,
public HighWaterAgent(Meter meter, IHighWaterDetector detector, ShardStateTracker tracker,
ILogger logger,
DaemonSettings settings, CancellationToken token)
{
_detector = detector;
Expand All @@ -32,6 +39,11 @@ public HighWaterAgent(IHighWaterDetector detector, ShardStateTracker tracker, IL

_timer = new Timer(_settings.HealthCheckPollingTime.TotalMilliseconds) { AutoReset = true };
_timer.Elapsed += TimerOnElapsed;

_spanName = detector.DatabaseName.EqualsIgnoreCase("Marten") ? "marten.daemon.highwatermark" : $"marten.{_detector.DatabaseName.ToLowerInvariant()}.daemon.highwatermark";

var meterName = detector.DatabaseName.EqualsIgnoreCase("Marten") ? "marten.daemon.skipping" : $"marten.{_detector.DatabaseName.ToLowerInvariant()}.daemon.skipping";
_skipping = meter.CreateCounter<int>(meterName);
}

public bool IsRunning { get; private set; }
Expand Down Expand Up @@ -90,6 +102,8 @@ private async Task detectChanges()
break;
}

using var activity = MartenTracing.StartActivity(_spanName);

HighWaterStatistics statistics = null;
try
{
Expand All @@ -104,17 +118,21 @@ private async Task detectChanges()

_logger.LogError(ex, "Failed while trying to detect high water statistics for database {Name}", _detector.DatabaseName);
await Task.Delay(_settings.SlowPollingTime, _token).ConfigureAwait(false);

activity?.RecordException(ex);

continue;

}
catch (Exception e)
{
_logger.LogError(e, "Failed while trying to detect high water statistics for database {Name}", _detector.DatabaseName);
activity?.RecordException(e);
await Task.Delay(_settings.SlowPollingTime, _token).ConfigureAwait(false);
continue;
}

var status = statistics.InterpretStatus(_current);
var status = tagActivity(statistics, activity);

switch (status)
{
Expand Down Expand Up @@ -142,7 +160,17 @@ private async Task detectChanges()
"High Water agent is stale after threshold of {DelayInSeconds} seconds, skipping gap to events marked after {SafeHarborTime} for database {Name}",
_settings.StaleSequenceThreshold.TotalSeconds, safeHarborTime, _detector.DatabaseName);

activity?.SetTag("skipped", "true");

var lastKnown = statistics.CurrentMark;

statistics = await _detector.DetectInSafeZone(_token).ConfigureAwait(false);

status = tagActivity(statistics, activity);
activity?.SetTag("last.mark", lastKnown);

_skipping.Add(1);

await markProgressAsync(statistics, _settings.FastPollingTime, status).ConfigureAwait(false);
break;
}
Expand All @@ -151,6 +179,16 @@ private async Task detectChanges()
_logger.LogInformation("HighWaterAgent has detected a cancellation and has stopped polling for database {Name}", _detector.DatabaseName);
}

private HighWaterStatus tagActivity(HighWaterStatistics statistics, Activity activity)
{
var status = statistics.InterpretStatus(_current);

activity?.AddTag("sequence", statistics.HighestSequence);
activity?.AddTag("status", status.ToString());
activity?.AddTag("current.mark", statistics.CurrentMark);
return status;
}

private async Task markProgressAsync(HighWaterStatistics statistics, TimeSpan delayTime, HighWaterStatus status)
{
if (!IsRunning)
Expand Down
2 changes: 1 addition & 1 deletion src/Marten/Events/Daemon/ProjectionDaemon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public ProjectionDaemon(DocumentStore store, MartenDatabase database, ILogger lo
_factory = factory;
Logger = logger;
Tracker = Database.Tracker;
_highWater = new HighWaterAgent(detector, Tracker, logger, store.Options.Projections, _cancellation.Token);
_highWater = new HighWaterAgent(store.Options.OpenTelemetry.Meter, detector, Tracker, logger, store.Options.Projections, _cancellation.Token);

_breakSubscription = database.Tracker.Subscribe(this);

Expand Down
11 changes: 7 additions & 4 deletions src/Marten/Subscriptions/SubscriptionWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@ public ScopedSubscriptionServiceWrapper(IServiceProvider provider)
var scope = _provider.CreateAsyncScope();
var sp = scope.ServiceProvider;

var subscription = sp.GetRequiredService<T>().As<SubscriptionBase>();
IncludedEventTypes.AddRange(subscription.IncludedEventTypes);
StreamType = subscription.StreamType;
IncludeArchivedEvents = subscription.IncludeArchivedEvents;
var subscription = sp.GetRequiredService<T>() as SubscriptionBase;
if (subscription != null)
{
IncludedEventTypes.AddRange(subscription.IncludedEventTypes);
StreamType = subscription.StreamType;
IncludeArchivedEvents = subscription.IncludeArchivedEvents;
}
scope.SafeDispose();
}

Expand Down

0 comments on commit 9f84dda

Please sign in to comment.