Skip to content

Commit

Permalink
Fix event load exceptions causing rebuilds to finish early
Browse files Browse the repository at this point in the history
  • Loading branch information
Hawxy committed Jul 9, 2024
1 parent 499dd03 commit 3aa7aa9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
29 changes: 25 additions & 4 deletions src/Marten/Events/Daemon/Internals/EventLoader.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -14,7 +15,7 @@

namespace Marten.Events.Daemon.Internals;

internal class EventLoader: IEventLoader
internal sealed class EventLoader: IEventLoader
{
private readonly int _aggregateIndex;
private readonly int _batchSize;
Expand All @@ -23,7 +24,6 @@ internal class EventLoader: IEventLoader
private readonly NpgsqlParameter _floor;
private readonly IEventStorage _storage;
private readonly IDocumentStore _store;

public EventLoader(DocumentStore store, MartenDatabase database, AsyncProjectionShard shard, AsyncOptions options) : this(store, database, options, shard.BuildFilters(store).ToArray())
{

Expand Down Expand Up @@ -80,6 +80,8 @@ public async Task<EventPage> LoadAsync(EventRequest request,
_floor.Value = request.Floor;
_ceiling.Value = request.HighWater;

var skippedEvents = 0;

await using var reader = await session.ExecuteReaderAsync(_command, token).ConfigureAwait(false);
while (await reader.ReadAsync(token).ConfigureAwait(false))
{
Expand All @@ -100,7 +102,8 @@ public async Task<EventPage> LoadAsync(EventRequest request,
{
if (request.ErrorOptions.SkipUnknownEvents)
{
request.Runtime.Logger.LogWarning("Skipping unknown event type '{EventTypeName}'", e.EventTypeName);
request.Runtime.Logger.EventUnknown(e.EventTypeName);
skippedEvents++;
}
else
{
Expand All @@ -112,7 +115,10 @@ public async Task<EventPage> LoadAsync(EventRequest request,
{
if (request.ErrorOptions.SkipSerializationErrors)
{
request.Runtime.Logger.EventDeserializationException(e.InnerException!.GetType().Name!, e.Sequence);
request.Runtime.Logger.EventDeserializationExceptionDebug(e);
await request.Runtime.RecordDeadLetterEventAsync(e.ToDeadLetterEvent(request.Name)).ConfigureAwait(false);
skippedEvents++;
}
else
{
Expand All @@ -122,8 +128,23 @@ public async Task<EventPage> LoadAsync(EventRequest request,
}
}

page.CalculateCeiling(_batchSize, request.HighWater);
page.CalculateCeiling(_batchSize, request.HighWater, skippedEvents);

return page;
}

}

internal static partial class Log
{
[LoggerMessage(LogLevel.Warning, "Skipping unknown event type '{EventTypeName}'")]
public static partial void EventUnknown(this ILogger logger, string eventTypeName);

[LoggerMessage(LogLevel.Warning,"Suppressed Serialization exception of type {ExceptionName} occured whilst loading event at sequence {Sequence}. Enable debug logging or disable SkipSerializationErrors for full stack trace.")]
public static partial void EventDeserializationException(this ILogger logger, string exceptionName, long sequence);

[LoggerMessage(LogLevel.Debug)]
public static partial void EventDeserializationExceptionDebug(this ILogger logger, Exception exception);
}


4 changes: 2 additions & 2 deletions src/Marten/Events/Daemon/Internals/EventPage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ public EventPage(long floor)
public long Floor { get; }
public long Ceiling { get; private set; }

public void CalculateCeiling(int batchSize, long highWaterMark)
public void CalculateCeiling(int batchSize, long highWaterMark, int skippedEvents)
{
Ceiling = Count == batchSize
Ceiling = (Count + skippedEvents) == batchSize
? this.Last().Sequence
: highWaterMark;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Marten/Events/Daemon/Internals/SubscriptionAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public async Task StartAsync(SubscriptionExecutionRequest request)
public async Task ReplayAsync(SubscriptionExecutionRequest request, long highWaterMark, TimeSpan timeout)
{
Mode = ShardExecutionMode.Rebuild;
_rebuild = new TaskCompletionSource();
_rebuild = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_execution.Mode = ShardExecutionMode.Rebuild;
ErrorOptions = request.ErrorHandling;
_runtime = request.Runtime;
Expand Down

0 comments on commit 3aa7aa9

Please sign in to comment.