Skip to content

Commit

Permalink
V2.10.0 (#65)
Browse files Browse the repository at this point in the history
* `IServiceBusSubscriber` and fixes.

* Doco tweak.

* TransientException retry after property
  • Loading branch information
chullybun authored Apr 27, 2023
1 parent ae8a3bc commit 3befc85
Show file tree
Hide file tree
Showing 16 changed files with 415 additions and 32 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

Represents the **NuGet** versions.

## v2.10.0
- *Enhancement:* Added `IServiceBusSubscriber` with following properties: `AbandonOnTransient` (perform an Abandon versus bubbling exception), `MaxDeliveryCount` (max delivery count check within subscriber), `RetryDelay` (basic transient retry specification) and `MaxRetryDelay` (defines upper bounds of retry delay). These are defaulted from corresponding `IConfiguration` settings. Both the `ServiveBusSubscriber` and `ServiveBusOrchestratedSubscriber` implement; related logic within `ServiceBusSubscriberInvoker`.
- *Enhancement:* Added `RetryAfterSeconds` to `TransientException` to allow overriding; defaults to `120` seconds.
- *Fixed:* Log output from subscribers will no longer write exception stack trace where known `IExtendedException` (noise reduction).
- *Fixed:* `ValidationException` message reformatted such that newlines are no longer included (message simplification).

## v2.9.1
- *Fixed:* The dead-lettering within `ServiceBusSubscriberInvoker` will write the exception stack trace, etc. to a new message property named `SubscriberException` to ensure this content is consistently persisted, with related error description being the exception message only.

Expand Down
2 changes: 1 addition & 1 deletion Common.targets
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Version>2.9.1</Version>
<Version>2.10.0</Version>
<LangVersion>preview</LangVersion>
<Authors>Avanade</Authors>
<Company>Avanade</Company>
Expand Down
2 changes: 1 addition & 1 deletion samples/My.Hr/My.Hr.Infra.Tests/My.Hr.Infra.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="UnitTestEx.NUnit" Version="2.2.1" />
<PackageReference Include="UnitTestEx.NUnit" Version="2.2.3" />
</ItemGroup>

<ItemGroup>
Expand Down
4 changes: 2 additions & 2 deletions samples/My.Hr/My.Hr.UnitTest/My.Hr.UnitTest.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="UnitTestEx" Version="2.2.1" />
<PackageReference Include="UnitTestEx.NUnit" Version="2.2.1" />
<PackageReference Include="UnitTestEx" Version="2.2.3" />
<PackageReference Include="UnitTestEx.NUnit" Version="2.2.3" />
</ItemGroup>

<ItemGroup>
Expand Down
40 changes: 40 additions & 0 deletions src/CoreEx.Azure/ServiceBus/IServiceBusSubscriber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) Avanade. Licensed under the MIT License. See https://github.com/Avanade/CoreEx

using Azure.Messaging.ServiceBus;
using CoreEx.Abstractions;
using Microsoft.Azure.WebJobs.ServiceBus;
using System;

namespace CoreEx.Azure.ServiceBus
{
/// <summary>
/// Defines the standardized Azure Service Bus subscriber properties.
/// </summary>
public interface IServiceBusSubscriber
{
/// <summary>
/// Indicates when <c>true</c> that an <see cref="ServiceBusMessageActions.AbandonMessageAsync"/> should be issued when a <see cref="IExtendedException.IsTransient"/> <see cref="Exception"/> is encounterd; otherwise, when <c>false</c> allow the <see cref="Exception"/> to bubble up the stack.
/// </summary>
bool AbandonOnTransient { get; set; }

/// <summary>
/// Gets or sets the optional maximum delivery count before a corresponding <see cref="ServiceBusMessageActions.DeadLetterMessageAsync(ServiceBusReceivedMessage, string, string, System.Threading.CancellationToken)"/> will be issued.
/// </summary>
/// <remarks>Where <c>null</c> this indicates that this checking is solely the responsibility of the Azure Service Bus infrastructure. This value can not exceed the corresponding Azure Service Bus configuration setting as that takes precedence.</remarks>
int? MaxDeliveryCount { get; set; }

/// <summary>
/// Get or sets the optional retry <see cref="TimeSpan"/> to define a multiplicative delay where an <see cref="IExtendedException.IsTransient"/> <see cref="Exception"/> is encounterd.
/// </summary>
/// <remarks>The <see cref="ServiceBusReceivedMessage.DeliveryCount"/> is multiplied by this value to achieve the final multiplicative delay value.
/// <para>This is performed after an unsuccessful transient processing attempt effectively continuing to lock the the <see cref="ServiceBusReceivedMessage"/> for the duration of the delay before finally handling as defined by <see cref="AbandonOnTransient"/>.
/// A <see cref="ServiceBusMessageActions.RenewMessageLockAsync(ServiceBusReceivedMessage, System.Threading.CancellationToken)"/> is also performed directly before the delay explicitly renewing.</para></remarks>
TimeSpan? RetryDelay { get; set; }

/// <summary>
/// Gets or sets the optional maximum retry <see cref="TimeSpan"/> that represents the upper bounds of the multiplicative <see cref="RetryDelay"/>.
/// </summary>
/// <remarks>Where a value is specified and the corresponding <see cref="RetryDelay"/> is <c>null</c> this value will be used only achieving a fixed delay value.</remarks>
TimeSpan? MaxRetryDelay { get; set; }
}
}
18 changes: 17 additions & 1 deletion src/CoreEx.Azure/ServiceBus/ServiceBusOrchestratedSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace CoreEx.Azure.ServiceBus
/// message identifiers. Where the unhandled <see cref="Exception"/> is <see cref="IExtendedException.IsTransient"/> this will bubble out for the Azure Function runtime/fabric to retry and automatically deadletter; otherwise, it will be
/// immediately deadletted with a reason of <see cref="IExtendedException.ErrorType"/> or <see cref="ErrorType.UnhandledError"/> depending on the exception <see cref="Type"/>.
/// <para>The <see cref="ServiceBusSubscriber.UpdateEventDataWithServiceBusMessage(EventData, ServiceBusReceivedMessage, ServiceBusMessageActions)"/> is invoked after each <see cref="EventData"/> deserialization.</para></remarks>
public class ServiceBusOrchestratedSubscriber : EventSubscriberBase
public class ServiceBusOrchestratedSubscriber : EventSubscriberBase, IServiceBusSubscriber
{
/// <summary>
/// Initializes a new instance of the <see cref="ServiceBusOrchestratedSubscriber"/> class.
Expand All @@ -39,6 +39,10 @@ public ServiceBusOrchestratedSubscriber(EventSubscriberOrchestrator orchestrator
{
Orchestrator = orchestrator ?? throw new ArgumentNullException(nameof(orchestrator));
ServiceBusSubscriberInvoker = serviceBusSubscriberInvoker ?? (ServiceBusSubscriber._invoker ??= new ServiceBusSubscriberInvoker());
AbandonOnTransient = settings.GetValue($"{GetType().Name}__{nameof(AbandonOnTransient)}", false);
MaxDeliveryCount = settings.GetValue<int?>($"{GetType().Name}__{nameof(MaxDeliveryCount)}");
RetryDelay = settings.GetValue<TimeSpan?>($"{GetType().Name}__{nameof(RetryDelay)}");
MaxRetryDelay = settings.GetValue<TimeSpan?>($"{GetType().Name}__{nameof(MaxRetryDelay)}");
}

/// <summary>
Expand All @@ -56,6 +60,18 @@ public ServiceBusOrchestratedSubscriber(EventSubscriberOrchestrator orchestrator
/// </summary>
protected new IEventDataConverter<ServiceBusReceivedMessage> EventDataConverter => (IEventDataConverter<ServiceBusReceivedMessage>)base.EventDataConverter;

/// <inheritdoc/>
public bool AbandonOnTransient { get; set; }

/// <inheritdoc/>
public int? MaxDeliveryCount { get; set; }

/// <inheritdoc/>
public TimeSpan? RetryDelay { get; set; }

/// <inheritdoc/>
public TimeSpan? MaxRetryDelay { get; set; }

/// <summary>
/// Encapsulates the execution of an <see cref="ServiceBusReceivedMessage"/> leveraging the underlying <see cref="Orchestrator"/> to receive and process the message.
/// </summary>
Expand Down
22 changes: 20 additions & 2 deletions src/CoreEx.Azure/ServiceBus/ServiceBusSubscriber.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace CoreEx.Azure.ServiceBus
/// message identifiers. Where the unhandled <see cref="Exception"/> is <see cref="IExtendedException.IsTransient"/> this will bubble out for the Azure Function runtime/fabric to retry and automatically deadletter; otherwise, it will be
/// immediately deadletted with a reason of <see cref="IExtendedException.ErrorType"/> or <see cref="ErrorType.UnhandledError"/> depending on the exception <see cref="Type"/>.
/// <para>The <see cref="UpdateEventDataWithServiceBusMessage(EventData, ServiceBusReceivedMessage, ServiceBusMessageActions)"/> is invoked after each <see cref="EventData"/> deserialization.</para></remarks>
public class ServiceBusSubscriber : EventSubscriberBase
public class ServiceBusSubscriber : EventSubscriberBase, IServiceBusSubscriber
{
/// <summary>
/// Gets the <see cref="EventDataBase.Internal"/> name to access the <see cref="ServiceBusMessage"/>.
Expand All @@ -48,7 +48,13 @@ public class ServiceBusSubscriber : EventSubscriberBase
/// <param name="eventSerializer">The optional <see cref="IEventSerializer"/>.</param>
public ServiceBusSubscriber(ExecutionContext executionContext, SettingsBase settings, ILogger<ServiceBusSubscriber> logger, EventSubscriberInvoker? eventSubscriberInvoker = null, ServiceBusSubscriberInvoker? serviceBusSubscriberInvoker = null, IEventDataConverter<ServiceBusReceivedMessage>? eventDataConverter = null, IEventSerializer? eventSerializer = null)
: base(eventDataConverter ?? new ServiceBusReceivedMessageEventDataConverter(eventSerializer ?? new CoreEx.Text.Json.EventDataSerializer()), executionContext, settings, logger, eventSubscriberInvoker)
=> ServiceBusSubscriberInvoker = serviceBusSubscriberInvoker ?? (_invoker ??= new ServiceBusSubscriberInvoker());
{
ServiceBusSubscriberInvoker = serviceBusSubscriberInvoker ?? (_invoker ??= new ServiceBusSubscriberInvoker());
AbandonOnTransient = settings.GetValue($"{GetType().Name}__{nameof(AbandonOnTransient)}", false);
MaxDeliveryCount = settings.GetValue<int?>($"{GetType().Name}__{nameof(MaxDeliveryCount)}");
RetryDelay = settings.GetValue<TimeSpan?>($"{GetType().Name}__{nameof(RetryDelay)}");
MaxRetryDelay = settings.GetValue<TimeSpan?>($"{GetType().Name}__{nameof(MaxRetryDelay)}");
}

/// <summary>
/// Gets the <see cref="ServiceBus.ServiceBusSubscriberInvoker"/>.
Expand All @@ -60,6 +66,18 @@ public ServiceBusSubscriber(ExecutionContext executionContext, SettingsBase sett
/// </summary>
protected new IEventDataConverter<ServiceBusReceivedMessage> EventDataConverter => (IEventDataConverter<ServiceBusReceivedMessage>)base.EventDataConverter;

/// <inheritdoc/>
public bool AbandonOnTransient { get; set; }

/// <inheritdoc/>
public int? MaxDeliveryCount { get; set; }

/// <inheritdoc/>
public TimeSpan? RetryDelay { get; set; }

/// <inheritdoc/>
public TimeSpan? MaxRetryDelay { get; set; }

/// <summary>
/// Encapsulates the execution of an <see cref="ServiceBusReceivedMessage"/> <paramref name="function"/> converting the <paramref name="message"/> into a corresponding <see cref="EventData"/> (with no value) for processing.
/// </summary>
Expand Down
93 changes: 77 additions & 16 deletions src/CoreEx.Azure/ServiceBus/ServiceBusSubscriberInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace CoreEx.Azure.ServiceBus
public class ServiceBusSubscriberInvoker : InvokerBase<EventSubscriberBase, (ServiceBusReceivedMessage Message, ServiceBusMessageActions MessageActions)>
{
private const string SubscriberExceptionPropertyName = "SubscriberException";
private const string SubscriberAbandonReasonPropertyName = "SubscriberAbandonReason";

/// <inheritdoc/>
protected async override Task<TResult> OnInvokeAsync<TResult>(EventSubscriberBase invoker, Func<CancellationToken, Task<TResult>> func, (ServiceBusReceivedMessage Message, ServiceBusMessageActions MessageActions) args, CancellationToken cancellationToken)
Expand Down Expand Up @@ -65,23 +66,12 @@ protected async override Task<TResult> OnInvokeAsync<TResult>(EventSubscriberBas
}
catch (Exception ex)
{
if (ex is IExtendedException eex)
{
if (eex.IsTransient)
{
// Do not abandon the message when transient, as there may be a Retry Policy configured; otherwise, it should eventaully be dead-lettered by the host/runtime/fabric.
invoker.Logger.LogWarning(ex, "Retry - Service Bus message '{Message}'. [{Reason}] Processing attempt {Count}. {Error}", args.Message.MessageId, eex.ErrorType, args.Message.DeliveryCount, ex.Message);
OnAfterMessageProcessing(invoker, args.Message, ex);
throw;
}
var keepThrowing = await HandleExceptionAsync(invoker, args.Message, args.MessageActions, ex, cancellationToken).ConfigureAwait(false);
OnAfterMessageProcessing(invoker, args.Message, ex);

await DeadLetterExceptionAsync(invoker, args.Message, args.MessageActions, eex.ErrorType, ex, cancellationToken).ConfigureAwait(false);
}
else
await DeadLetterExceptionAsync(invoker, args.Message, args.MessageActions, ErrorType.UnhandledError.ToString(), ex, cancellationToken).ConfigureAwait(false);
if (keepThrowing)
throw;

// It's been handled, swallow the exception and carry on.
OnAfterMessageProcessing(invoker, args.Message, ex);
return default!;
}
finally
Expand All @@ -90,14 +80,85 @@ protected async override Task<TResult> OnInvokeAsync<TResult>(EventSubscriberBas
}
}

/// <summary>
/// Handle the exception.
/// </summary>
private static async Task<bool> HandleExceptionAsync(EventSubscriberBase invoker, ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, Exception exception, CancellationToken cancellationToken)
{
// Handle a known exception type.
if (exception is IExtendedException eex)
{
// Where not considered transient then dead-letter.
if (!eex.IsTransient)
{
await DeadLetterExceptionAsync(invoker, message, messageActions, eex.ErrorType, exception, cancellationToken).ConfigureAwait(false);
return false;
}

// Determine the delay, if any.
var sbs = invoker as IServiceBusSubscriber;
var delay = sbs is not null && sbs.RetryDelay.HasValue ? (int)(sbs.RetryDelay.Value.TotalMilliseconds * message.DeliveryCount) : -1;
if (sbs is not null && sbs.MaxRetryDelay.HasValue)
{
if (delay < 0 || delay > sbs.MaxRetryDelay.Value.TotalMilliseconds)
delay = (int)sbs.MaxRetryDelay.Value.TotalMilliseconds;
}

// Where the exception is known then exception and stack trace need not be logged.
var ex = exception is EventSubscriberException esex && esex.HasInnerExtendedException ? null : exception;

// Log the transient retry as a warning.
if (delay <= 0)
invoker.Logger.LogWarning(ex, "Retry - Service Bus message '{Message}'. [{Reason}] Processing attempt {Count}. {Error}", message.MessageId, eex.ErrorType, message.DeliveryCount, exception.Message);
else
invoker.Logger.LogWarning(ex, "Retry - Service Bus message '{Message}'. [{Reason}] Processing attempt {Count}; retry delay {Delay}ms. {Error}", message.MessageId, eex.ErrorType, message.DeliveryCount, delay, exception.Message);

if (sbs is not null)
{
if (sbs.MaxDeliveryCount.HasValue && message.DeliveryCount >= sbs.MaxDeliveryCount.Value)
{
// Dead-letter when maximum delivery count achieved.
await DeadLetterExceptionAsync(invoker, message, messageActions, "MaxDeliveryCountExceeded", new EventSubscriberException($"Message could not be consumed after {sbs.MaxDeliveryCount.Value} attempts (as defined by {invoker.GetType().Name}).", exception), cancellationToken).ConfigureAwait(false);
return false;
}

if (delay > 0)
{
// Renew the lock to maximize time and then delay.
await messageActions.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false);
invoker.Logger.LogDebug("Retry delaying - Service Bus message '{Message}'. Retry delay {Delay}ms.", message.MessageId, delay);
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
invoker.Logger.LogDebug("Retry delayed - Service Bus message '{Message}'.", message.MessageId, delay);
}

if (sbs.AbandonOnTransient)
{
// Abandon message versus bubbling.
invoker.Logger.LogDebug("Abandoning - Service Bus message '{Message}'.", message.MessageId);
await messageActions.AbandonMessageAsync(message, new Dictionary<string, object?> { { SubscriberAbandonReasonPropertyName, FormatText(exception.Message) } }, cancellationToken).ConfigureAwait(false);
invoker.Logger.LogDebug("Abandoned - Service Bus message '{Message}'.", message.MessageId);
return false;
}
}

return true; // Keep throwing; i.e. bubble exception.
}

// Dead-letter the unhandled exception.
await DeadLetterExceptionAsync(invoker, message, messageActions, ErrorType.UnhandledError.ToString(), exception, cancellationToken).ConfigureAwait(false);
return false;
}

/// <summary>
/// Performs the dead-lettering.
/// </summary>
public static async Task DeadLetterExceptionAsync(EventSubscriberBase invoker, ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, string errorReason, Exception exception, CancellationToken cancellationToken)
{
var ex = exception is EventSubscriberException esex && esex.HasInnerExtendedException ? null : exception;

invoker.Logger.LogDebug("Dead Lettering - Service Bus message '{Message}'. [{Reason}] {Error}", message.MessageId, errorReason, exception.Message);
await messageActions.DeadLetterMessageAsync(message, new Dictionary<string, object?> { { SubscriberExceptionPropertyName, FormatText(exception.ToString()) } }, errorReason, FormatText(exception.Message), cancellationToken).ConfigureAwait(false);
invoker.Logger.LogError(exception, "Dead Lettered - Service Bus message '{Message}'. [{Reason}] {Error}", message.MessageId, errorReason, exception.Message);
invoker.Logger.LogError(ex, "Dead Lettered - Service Bus message '{Message}'. [{Reason}] {Error}", message.MessageId, errorReason, exception.Message);
}

/// <summary>
Expand Down
Loading

0 comments on commit 3befc85

Please sign in to comment.