diff --git a/CHANGELOG.md b/CHANGELOG.md index ca1a0d10..9797b603 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ Represents the **NuGet** versions. +## v2.10.0 +- *Enhancement:* Added `IServiceBusSubscriber` with following properties: `AbandonOnTransient` (perform an Abandon versus bubbling exception), `MaxDeliveryCount` (max delivery count check within subscriber), `RetryDelay` (basic transient retry specification) and `MaxRetryDelay` (defines upper bounds of retry delay). These are defaulted from corresponding `IConfiguration` settings. Both the `ServiveBusSubscriber` and `ServiveBusOrchestratedSubscriber` implement; related logic within `ServiceBusSubscriberInvoker`. +- *Enhancement:* Added `RetryAfterSeconds` to `TransientException` to allow overriding; defaults to `120` seconds. +- *Fixed:* Log output from subscribers will no longer write exception stack trace where known `IExtendedException` (noise reduction). +- *Fixed:* `ValidationException` message reformatted such that newlines are no longer included (message simplification). + ## v2.9.1 - *Fixed:* The dead-lettering within `ServiceBusSubscriberInvoker` will write the exception stack trace, etc. to a new message property named `SubscriberException` to ensure this content is consistently persisted, with related error description being the exception message only. diff --git a/Common.targets b/Common.targets index 07f445f0..364775ce 100644 --- a/Common.targets +++ b/Common.targets @@ -1,6 +1,6 @@  - 2.9.1 + 2.10.0 preview Avanade Avanade diff --git a/samples/My.Hr/My.Hr.Infra.Tests/My.Hr.Infra.Tests.csproj b/samples/My.Hr/My.Hr.Infra.Tests/My.Hr.Infra.Tests.csproj index de9664fb..a1de6955 100644 --- a/samples/My.Hr/My.Hr.Infra.Tests/My.Hr.Infra.Tests.csproj +++ b/samples/My.Hr/My.Hr.Infra.Tests/My.Hr.Infra.Tests.csproj @@ -22,7 +22,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - + diff --git a/samples/My.Hr/My.Hr.UnitTest/My.Hr.UnitTest.csproj b/samples/My.Hr/My.Hr.UnitTest/My.Hr.UnitTest.csproj index f770fa5b..b77442b1 100644 --- a/samples/My.Hr/My.Hr.UnitTest/My.Hr.UnitTest.csproj +++ b/samples/My.Hr/My.Hr.UnitTest/My.Hr.UnitTest.csproj @@ -27,8 +27,8 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - + + diff --git a/src/CoreEx.Azure/ServiceBus/IServiceBusSubscriber.cs b/src/CoreEx.Azure/ServiceBus/IServiceBusSubscriber.cs new file mode 100644 index 00000000..236555e7 --- /dev/null +++ b/src/CoreEx.Azure/ServiceBus/IServiceBusSubscriber.cs @@ -0,0 +1,40 @@ +// Copyright (c) Avanade. Licensed under the MIT License. See https://github.com/Avanade/CoreEx + +using Azure.Messaging.ServiceBus; +using CoreEx.Abstractions; +using Microsoft.Azure.WebJobs.ServiceBus; +using System; + +namespace CoreEx.Azure.ServiceBus +{ + /// + /// Defines the standardized Azure Service Bus subscriber properties. + /// + public interface IServiceBusSubscriber + { + /// + /// Indicates when true that an should be issued when a is encounterd; otherwise, when false allow the to bubble up the stack. + /// + bool AbandonOnTransient { get; set; } + + /// + /// Gets or sets the optional maximum delivery count before a corresponding will be issued. + /// + /// Where null this indicates that this checking is solely the responsibility of the Azure Service Bus infrastructure. This value can not exceed the corresponding Azure Service Bus configuration setting as that takes precedence. + int? MaxDeliveryCount { get; set; } + + /// + /// Get or sets the optional retry to define a multiplicative delay where an is encounterd. + /// + /// The is multiplied by this value to achieve the final multiplicative delay value. + /// This is performed after an unsuccessful transient processing attempt effectively continuing to lock the the for the duration of the delay before finally handling as defined by . + /// A is also performed directly before the delay explicitly renewing. + TimeSpan? RetryDelay { get; set; } + + /// + /// Gets or sets the optional maximum retry that represents the upper bounds of the multiplicative . + /// + /// Where a value is specified and the corresponding is null this value will be used only achieving a fixed delay value. + TimeSpan? MaxRetryDelay { get; set; } + } +} \ No newline at end of file diff --git a/src/CoreEx.Azure/ServiceBus/ServiceBusOrchestratedSubscriber.cs b/src/CoreEx.Azure/ServiceBus/ServiceBusOrchestratedSubscriber.cs index 42e20edc..0688f747 100644 --- a/src/CoreEx.Azure/ServiceBus/ServiceBusOrchestratedSubscriber.cs +++ b/src/CoreEx.Azure/ServiceBus/ServiceBusOrchestratedSubscriber.cs @@ -21,7 +21,7 @@ namespace CoreEx.Azure.ServiceBus /// message identifiers. Where the unhandled is this will bubble out for the Azure Function runtime/fabric to retry and automatically deadletter; otherwise, it will be /// immediately deadletted with a reason of or depending on the exception . /// The is invoked after each deserialization. - public class ServiceBusOrchestratedSubscriber : EventSubscriberBase + public class ServiceBusOrchestratedSubscriber : EventSubscriberBase, IServiceBusSubscriber { /// /// Initializes a new instance of the class. @@ -39,6 +39,10 @@ public ServiceBusOrchestratedSubscriber(EventSubscriberOrchestrator orchestrator { Orchestrator = orchestrator ?? throw new ArgumentNullException(nameof(orchestrator)); ServiceBusSubscriberInvoker = serviceBusSubscriberInvoker ?? (ServiceBusSubscriber._invoker ??= new ServiceBusSubscriberInvoker()); + AbandonOnTransient = settings.GetValue($"{GetType().Name}__{nameof(AbandonOnTransient)}", false); + MaxDeliveryCount = settings.GetValue($"{GetType().Name}__{nameof(MaxDeliveryCount)}"); + RetryDelay = settings.GetValue($"{GetType().Name}__{nameof(RetryDelay)}"); + MaxRetryDelay = settings.GetValue($"{GetType().Name}__{nameof(MaxRetryDelay)}"); } /// @@ -56,6 +60,18 @@ public ServiceBusOrchestratedSubscriber(EventSubscriberOrchestrator orchestrator /// protected new IEventDataConverter EventDataConverter => (IEventDataConverter)base.EventDataConverter; + /// + public bool AbandonOnTransient { get; set; } + + /// + public int? MaxDeliveryCount { get; set; } + + /// + public TimeSpan? RetryDelay { get; set; } + + /// + public TimeSpan? MaxRetryDelay { get; set; } + /// /// Encapsulates the execution of an leveraging the underlying to receive and process the message. /// diff --git a/src/CoreEx.Azure/ServiceBus/ServiceBusSubscriber.cs b/src/CoreEx.Azure/ServiceBus/ServiceBusSubscriber.cs index 6848830f..b9e12822 100644 --- a/src/CoreEx.Azure/ServiceBus/ServiceBusSubscriber.cs +++ b/src/CoreEx.Azure/ServiceBus/ServiceBusSubscriber.cs @@ -22,7 +22,7 @@ namespace CoreEx.Azure.ServiceBus /// message identifiers. Where the unhandled is this will bubble out for the Azure Function runtime/fabric to retry and automatically deadletter; otherwise, it will be /// immediately deadletted with a reason of or depending on the exception . /// The is invoked after each deserialization. - public class ServiceBusSubscriber : EventSubscriberBase + public class ServiceBusSubscriber : EventSubscriberBase, IServiceBusSubscriber { /// /// Gets the name to access the . @@ -48,7 +48,13 @@ public class ServiceBusSubscriber : EventSubscriberBase /// The optional . public ServiceBusSubscriber(ExecutionContext executionContext, SettingsBase settings, ILogger logger, EventSubscriberInvoker? eventSubscriberInvoker = null, ServiceBusSubscriberInvoker? serviceBusSubscriberInvoker = null, IEventDataConverter? eventDataConverter = null, IEventSerializer? eventSerializer = null) : base(eventDataConverter ?? new ServiceBusReceivedMessageEventDataConverter(eventSerializer ?? new CoreEx.Text.Json.EventDataSerializer()), executionContext, settings, logger, eventSubscriberInvoker) - => ServiceBusSubscriberInvoker = serviceBusSubscriberInvoker ?? (_invoker ??= new ServiceBusSubscriberInvoker()); + { + ServiceBusSubscriberInvoker = serviceBusSubscriberInvoker ?? (_invoker ??= new ServiceBusSubscriberInvoker()); + AbandonOnTransient = settings.GetValue($"{GetType().Name}__{nameof(AbandonOnTransient)}", false); + MaxDeliveryCount = settings.GetValue($"{GetType().Name}__{nameof(MaxDeliveryCount)}"); + RetryDelay = settings.GetValue($"{GetType().Name}__{nameof(RetryDelay)}"); + MaxRetryDelay = settings.GetValue($"{GetType().Name}__{nameof(MaxRetryDelay)}"); + } /// /// Gets the . @@ -60,6 +66,18 @@ public ServiceBusSubscriber(ExecutionContext executionContext, SettingsBase sett /// protected new IEventDataConverter EventDataConverter => (IEventDataConverter)base.EventDataConverter; + /// + public bool AbandonOnTransient { get; set; } + + /// + public int? MaxDeliveryCount { get; set; } + + /// + public TimeSpan? RetryDelay { get; set; } + + /// + public TimeSpan? MaxRetryDelay { get; set; } + /// /// Encapsulates the execution of an converting the into a corresponding (with no value) for processing. /// diff --git a/src/CoreEx.Azure/ServiceBus/ServiceBusSubscriberInvoker.cs b/src/CoreEx.Azure/ServiceBus/ServiceBusSubscriberInvoker.cs index 25b8aaf4..ed7d6d9a 100644 --- a/src/CoreEx.Azure/ServiceBus/ServiceBusSubscriberInvoker.cs +++ b/src/CoreEx.Azure/ServiceBus/ServiceBusSubscriberInvoker.cs @@ -19,6 +19,7 @@ namespace CoreEx.Azure.ServiceBus public class ServiceBusSubscriberInvoker : InvokerBase { private const string SubscriberExceptionPropertyName = "SubscriberException"; + private const string SubscriberAbandonReasonPropertyName = "SubscriberAbandonReason"; /// protected async override Task OnInvokeAsync(EventSubscriberBase invoker, Func> func, (ServiceBusReceivedMessage Message, ServiceBusMessageActions MessageActions) args, CancellationToken cancellationToken) @@ -65,23 +66,12 @@ protected async override Task OnInvokeAsync(EventSubscriberBas } catch (Exception ex) { - if (ex is IExtendedException eex) - { - if (eex.IsTransient) - { - // Do not abandon the message when transient, as there may be a Retry Policy configured; otherwise, it should eventaully be dead-lettered by the host/runtime/fabric. - invoker.Logger.LogWarning(ex, "Retry - Service Bus message '{Message}'. [{Reason}] Processing attempt {Count}. {Error}", args.Message.MessageId, eex.ErrorType, args.Message.DeliveryCount, ex.Message); - OnAfterMessageProcessing(invoker, args.Message, ex); - throw; - } + var keepThrowing = await HandleExceptionAsync(invoker, args.Message, args.MessageActions, ex, cancellationToken).ConfigureAwait(false); + OnAfterMessageProcessing(invoker, args.Message, ex); - await DeadLetterExceptionAsync(invoker, args.Message, args.MessageActions, eex.ErrorType, ex, cancellationToken).ConfigureAwait(false); - } - else - await DeadLetterExceptionAsync(invoker, args.Message, args.MessageActions, ErrorType.UnhandledError.ToString(), ex, cancellationToken).ConfigureAwait(false); + if (keepThrowing) + throw; - // It's been handled, swallow the exception and carry on. - OnAfterMessageProcessing(invoker, args.Message, ex); return default!; } finally @@ -90,14 +80,85 @@ protected async override Task OnInvokeAsync(EventSubscriberBas } } + /// + /// Handle the exception. + /// + private static async Task HandleExceptionAsync(EventSubscriberBase invoker, ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, Exception exception, CancellationToken cancellationToken) + { + // Handle a known exception type. + if (exception is IExtendedException eex) + { + // Where not considered transient then dead-letter. + if (!eex.IsTransient) + { + await DeadLetterExceptionAsync(invoker, message, messageActions, eex.ErrorType, exception, cancellationToken).ConfigureAwait(false); + return false; + } + + // Determine the delay, if any. + var sbs = invoker as IServiceBusSubscriber; + var delay = sbs is not null && sbs.RetryDelay.HasValue ? (int)(sbs.RetryDelay.Value.TotalMilliseconds * message.DeliveryCount) : -1; + if (sbs is not null && sbs.MaxRetryDelay.HasValue) + { + if (delay < 0 || delay > sbs.MaxRetryDelay.Value.TotalMilliseconds) + delay = (int)sbs.MaxRetryDelay.Value.TotalMilliseconds; + } + + // Where the exception is known then exception and stack trace need not be logged. + var ex = exception is EventSubscriberException esex && esex.HasInnerExtendedException ? null : exception; + + // Log the transient retry as a warning. + if (delay <= 0) + invoker.Logger.LogWarning(ex, "Retry - Service Bus message '{Message}'. [{Reason}] Processing attempt {Count}. {Error}", message.MessageId, eex.ErrorType, message.DeliveryCount, exception.Message); + else + invoker.Logger.LogWarning(ex, "Retry - Service Bus message '{Message}'. [{Reason}] Processing attempt {Count}; retry delay {Delay}ms. {Error}", message.MessageId, eex.ErrorType, message.DeliveryCount, delay, exception.Message); + + if (sbs is not null) + { + if (sbs.MaxDeliveryCount.HasValue && message.DeliveryCount >= sbs.MaxDeliveryCount.Value) + { + // Dead-letter when maximum delivery count achieved. + await DeadLetterExceptionAsync(invoker, message, messageActions, "MaxDeliveryCountExceeded", new EventSubscriberException($"Message could not be consumed after {sbs.MaxDeliveryCount.Value} attempts (as defined by {invoker.GetType().Name}).", exception), cancellationToken).ConfigureAwait(false); + return false; + } + + if (delay > 0) + { + // Renew the lock to maximize time and then delay. + await messageActions.RenewMessageLockAsync(message, cancellationToken).ConfigureAwait(false); + invoker.Logger.LogDebug("Retry delaying - Service Bus message '{Message}'. Retry delay {Delay}ms.", message.MessageId, delay); + await Task.Delay(delay, cancellationToken).ConfigureAwait(false); + invoker.Logger.LogDebug("Retry delayed - Service Bus message '{Message}'.", message.MessageId, delay); + } + + if (sbs.AbandonOnTransient) + { + // Abandon message versus bubbling. + invoker.Logger.LogDebug("Abandoning - Service Bus message '{Message}'.", message.MessageId); + await messageActions.AbandonMessageAsync(message, new Dictionary { { SubscriberAbandonReasonPropertyName, FormatText(exception.Message) } }, cancellationToken).ConfigureAwait(false); + invoker.Logger.LogDebug("Abandoned - Service Bus message '{Message}'.", message.MessageId); + return false; + } + } + + return true; // Keep throwing; i.e. bubble exception. + } + + // Dead-letter the unhandled exception. + await DeadLetterExceptionAsync(invoker, message, messageActions, ErrorType.UnhandledError.ToString(), exception, cancellationToken).ConfigureAwait(false); + return false; + } + /// /// Performs the dead-lettering. /// public static async Task DeadLetterExceptionAsync(EventSubscriberBase invoker, ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, string errorReason, Exception exception, CancellationToken cancellationToken) { + var ex = exception is EventSubscriberException esex && esex.HasInnerExtendedException ? null : exception; + invoker.Logger.LogDebug("Dead Lettering - Service Bus message '{Message}'. [{Reason}] {Error}", message.MessageId, errorReason, exception.Message); await messageActions.DeadLetterMessageAsync(message, new Dictionary { { SubscriberExceptionPropertyName, FormatText(exception.ToString()) } }, errorReason, FormatText(exception.Message), cancellationToken).ConfigureAwait(false); - invoker.Logger.LogError(exception, "Dead Lettered - Service Bus message '{Message}'. [{Reason}] {Error}", message.MessageId, errorReason, exception.Message); + invoker.Logger.LogError(ex, "Dead Lettered - Service Bus message '{Message}'. [{Reason}] {Error}", message.MessageId, errorReason, exception.Message); } /// diff --git a/src/CoreEx.Database.SqlServer/Outbox/EventOutboxEnqueueBase.cs b/src/CoreEx.Database.SqlServer/Outbox/EventOutboxEnqueueBase.cs index f668033b..47c2d336 100644 --- a/src/CoreEx.Database.SqlServer/Outbox/EventOutboxEnqueueBase.cs +++ b/src/CoreEx.Database.SqlServer/Outbox/EventOutboxEnqueueBase.cs @@ -19,7 +19,7 @@ namespace CoreEx.Database.SqlServer.Outbox /// By default the events are first sent/enqueued to the database outbox, then a secondary process dequeues and sends. Also, by enqueing to a single database outbox the event publishing order is preserved. /// This can however introduce unwanted latency depending on the frequency in which the secondary process performs the dequeue and send, as this is essentially a polling-based operation. To improve (minimize) latency, the primary /// can be specified using . This will then be used to send the events immediately, and where successful, they will be audited in the database as dequeued - /// event(s); versus on error (as a backup), where they will be enqueued for the out-of-process dequeue and send (as per default). Note: the challenge this introduces is in-order publishing; there is no means to guarantee order for the + /// event(s); versus on error (as a backup), where they will be enqueued for the out-of-process dequeue and send (as per default). Note: the challenge this primary sender introduces is in-order publishing; there is no means to guarantee order for the /// events that are processed on error. public abstract class EventOutboxEnqueueBase : IEventSender { diff --git a/src/CoreEx/Events/EventSubscriberException.cs b/src/CoreEx/Events/EventSubscriberException.cs index 7863659d..719b693f 100644 --- a/src/CoreEx/Events/EventSubscriberException.cs +++ b/src/CoreEx/Events/EventSubscriberException.cs @@ -37,6 +37,11 @@ public EventSubscriberException(string message) : base(message) { } /// private IExtendedException? InnerExtendedException => InnerException is IExtendedException eex ? eex : null; + /// + /// Indicates that there is an and that it implements . + /// + public bool HasInnerExtendedException => InnerException is IExtendedException; + /// /// Gets the error type/reason. /// diff --git a/src/CoreEx/Events/Subscribing/EventSubscriberInvoker.cs b/src/CoreEx/Events/Subscribing/EventSubscriberInvoker.cs index 85ba17d9..a056f706 100644 --- a/src/CoreEx/Events/Subscribing/EventSubscriberInvoker.cs +++ b/src/CoreEx/Events/Subscribing/EventSubscriberInvoker.cs @@ -74,6 +74,10 @@ protected async override Task OnInvokeAsync(IErrorHandling err /// An value of will be treated as ; should generally be handled prior to invocation. public virtual void HandleError(EventSubscriberException eventSubscriberException, ErrorHandling errorHandling, ILogger logger) { + // Where the exception is known then exception and stack trace need not be logged. + var ex = eventSubscriberException.HasInnerExtendedException ? null : eventSubscriberException; + + // Handle based on error handling configuration. switch (errorHandling) { case ErrorHandling.TransientRetry: @@ -81,7 +85,7 @@ public virtual void HandleError(EventSubscriberException eventSubscriberExceptio throw eventSubscriberException; case ErrorHandling.CriticalFailFast: - logger.LogCritical(eventSubscriberException, LogFormat, eventSubscriberException.Message, eventSubscriberException.ExceptionSource, errorHandling.ToString()); + logger.LogCritical(ex, LogFormat, eventSubscriberException.Message, eventSubscriberException.ExceptionSource, errorHandling.ToString()); FailFast(eventSubscriberException); goto case ErrorHandling.ThrowSubscriberException; // A backup in case FailFast does not function as expected. @@ -91,15 +95,15 @@ public virtual void HandleError(EventSubscriberException eventSubscriberExceptio throw eventSubscriberException; case ErrorHandling.CompleteWithInformation: - logger.LogInformation(eventSubscriberException, LogFormat, eventSubscriberException.Message, eventSubscriberException.ExceptionSource, errorHandling.ToString()); + logger.LogInformation(ex, LogFormat, eventSubscriberException.Message, eventSubscriberException.ExceptionSource, errorHandling.ToString()); break; case ErrorHandling.CompleteWithWarning: - logger.LogWarning(eventSubscriberException, LogFormat, eventSubscriberException.Message, eventSubscriberException.ExceptionSource, errorHandling.ToString()); + logger.LogWarning(ex, LogFormat, eventSubscriberException.Message, eventSubscriberException.ExceptionSource, errorHandling.ToString()); break; case ErrorHandling.CompleteWithError: - logger.LogError(eventSubscriberException, LogFormat, eventSubscriberException.Message, eventSubscriberException.ExceptionSource, errorHandling.ToString()); + logger.LogError(ex, LogFormat, eventSubscriberException.Message, eventSubscriberException.ExceptionSource, errorHandling.ToString()); break; } } diff --git a/src/CoreEx/TransientException.cs b/src/CoreEx/TransientException.cs index f32709eb..22cf08ee 100644 --- a/src/CoreEx/TransientException.cs +++ b/src/CoreEx/TransientException.cs @@ -74,7 +74,14 @@ public TransientException() : this(null!) { } /// The value. public bool ShouldBeLogged => ShouldExceptionBeLogged; + /// + /// Gets or sets the corresponding seconds. + /// + /// Defaults to 120 seconds. + public int RetryAfterSeconds { get; set; } = 120; + /// + /// Sets the value to . public IActionResult ToResult() => new ExtendedContentResult { Content = Message, @@ -82,7 +89,7 @@ public TransientException() : this(null!) { } StatusCode = (int)StatusCode, BeforeExtension = r => { - r.GetTypedHeaders().Set(HeaderNames.RetryAfter, "120"); + r.GetTypedHeaders().Set(HeaderNames.RetryAfter, RetryAfterSeconds); return Task.CompletedTask; } }; diff --git a/src/CoreEx/ValidationException.cs b/src/CoreEx/ValidationException.cs index 1cfb975f..be34cf8b 100644 --- a/src/CoreEx/ValidationException.cs +++ b/src/CoreEx/ValidationException.cs @@ -123,7 +123,7 @@ private static string CreateMessage(IEnumerable mic, string message var sb = new StringBuilder(message); foreach (var mi in mic.Where(x => x.Type == MessageType.Error)) { - sb.Append($"{Environment.NewLine}\t{mi.Property}: {mi.Text}"); + sb.Append($" [{mi.Property}: {mi.Text}]"); } return sb.ToString(); diff --git a/tests/CoreEx.Cosmos.Test/CoreEx.Cosmos.Test.csproj b/tests/CoreEx.Cosmos.Test/CoreEx.Cosmos.Test.csproj index e6082273..e18d3717 100644 --- a/tests/CoreEx.Cosmos.Test/CoreEx.Cosmos.Test.csproj +++ b/tests/CoreEx.Cosmos.Test/CoreEx.Cosmos.Test.csproj @@ -34,7 +34,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all - + diff --git a/tests/CoreEx.Test/CoreEx.Test.csproj b/tests/CoreEx.Test/CoreEx.Test.csproj index f71cdb50..59ea1ebc 100644 --- a/tests/CoreEx.Test/CoreEx.Test.csproj +++ b/tests/CoreEx.Test/CoreEx.Test.csproj @@ -18,7 +18,7 @@ runtime; build; native; contentfiles; analyzers; buildtransitive all - + diff --git a/tests/CoreEx.Test/TestFunction/ServiceBusSubsciberTest.cs b/tests/CoreEx.Test/TestFunction/ServiceBusSubsciberTest.cs new file mode 100644 index 00000000..1cbbfce0 --- /dev/null +++ b/tests/CoreEx.Test/TestFunction/ServiceBusSubsciberTest.cs @@ -0,0 +1,226 @@ +using CoreEx.Azure.ServiceBus; +using CoreEx.Events; +using CoreEx.TestFunction; +using CoreEx.TestFunction.Functions; +using Microsoft.Azure.WebJobs.ServiceBus; +using Microsoft.Extensions.DependencyInjection; +using Moq; +using NUnit.Framework; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using UnitTestEx.NUnit; + +namespace CoreEx.Test.TestFunction +{ + [TestFixture] + public class ServiceBusSubsciberTest + { + [Test] + public void NoAbandonOnTransient() + { + using var test = FunctionTester.Create(); + var actions = test.CreateServiceBusMessageActions(); + var message = test.CreateServiceBusMessage(new { id = "A", name = "B", price = 1.99m }); + + var sbs = test.Services.GetRequiredService(); + sbs.AbandonOnTransient = false; + + Assert.ThrowsAsync(() => sbs.ReceiveAsync(message, actions, _ => throw new TransientException("Retry again please."))); + + actions.AssertRenew(0); + actions.AssertNone(); + } + + [Test] + public async Task AbandonOnTransient() + { + using var test = FunctionTester.Create(); + var actions = test.CreateServiceBusMessageActions(); + var message = test.CreateServiceBusMessage(new { id = "A", name = "B", price = 1.99m }); + + var sbs = test.Services.GetRequiredService(); + sbs.AbandonOnTransient = true; + + await sbs.ReceiveAsync(message, actions, _ => throw new TransientException("Retry again please.")); + + actions.AssertRenew(0); + actions.AssertAbandon(); + + Assert.IsNotNull(actions.PropertiesModified); + Assert.AreEqual(actions.PropertiesModified!["SubscriberAbandonReason"], "Retry again please."); + } + + [Test] + public async Task RetryDelay_DeliveryCount1() + { + using var test = FunctionTester.Create(); + var actions = test.CreateServiceBusMessageActions(); + var message = test.CreateServiceBusMessage(new { id = "A", name = "B", price = 1.99m }); + + var sbs = test.Services.GetRequiredService(); + sbs.AbandonOnTransient = true; + sbs.RetryDelay = TimeSpan.FromSeconds(1); + + var sw = Stopwatch.StartNew(); + await sbs.ReceiveAsync(message, actions, _ => throw new TransientException("Retry again please.")); + sw.Stop(); + + Assert.GreaterOrEqual(sw.ElapsedMilliseconds, 950); + + actions.AssertRenew(1); + actions.AssertAbandon(); + + Assert.IsNotNull(actions.PropertiesModified); + Assert.AreEqual(actions.PropertiesModified!["SubscriberAbandonReason"], "Retry again please."); + } + + [Test] + public async Task RetryDelay_DeliveryCount2() + { + using var test = FunctionTester.Create(); + var actions = test.CreateServiceBusMessageActions(); + var message = test.CreateServiceBusMessage(new { id = "A", name = "B", price = 1.99m }, m => m.Header.DeliveryCount = 2); + + var sbs = test.Services.GetRequiredService(); + sbs.AbandonOnTransient = true; + sbs.RetryDelay = TimeSpan.FromSeconds(1); + + var sw = Stopwatch.StartNew(); + await sbs.ReceiveAsync(message, actions, _ => throw new TransientException("Retry again please.")); + sw.Stop(); + + Assert.GreaterOrEqual(sw.ElapsedMilliseconds, 1950); + + actions.AssertRenew(1); + actions.AssertAbandon(); + + Assert.IsNotNull(actions.PropertiesModified); + Assert.AreEqual(actions.PropertiesModified!["SubscriberAbandonReason"], "Retry again please."); + } + + [Test] + public async Task RetryDelay_DeliveryCount2_WithMax() + { + using var test = FunctionTester.Create(); + var actions = test.CreateServiceBusMessageActions(); + var message = test.CreateServiceBusMessage(new { id = "A", name = "B", price = 1.99m }, m => m.Header.DeliveryCount = 2); + + var sbs = test.Services.GetRequiredService(); + sbs.AbandonOnTransient = true; + sbs.RetryDelay = TimeSpan.FromSeconds(1); + sbs.MaxRetryDelay = TimeSpan.FromMilliseconds(1100); + + var sw = Stopwatch.StartNew(); + await sbs.ReceiveAsync(message, actions, _ => throw new TransientException("Retry again please.")); + sw.Stop(); + + Assert.GreaterOrEqual(sw.ElapsedMilliseconds, 1050); + + actions.AssertRenew(1); + actions.AssertAbandon(); + + Assert.IsNotNull(actions.PropertiesModified); + Assert.AreEqual(actions.PropertiesModified!["SubscriberAbandonReason"], "Retry again please."); + } + + [Test] + public async Task RetryDelay_DeliveryCount2_WithMaxOnly() + { + using var test = FunctionTester.Create(); + var actions = test.CreateServiceBusMessageActions(); + var message = test.CreateServiceBusMessage(new { id = "A", name = "B", price = 1.99m }, m => m.Header.DeliveryCount = 2); + + var sbs = test.Services.GetRequiredService(); + sbs.AbandonOnTransient = true; + sbs.MaxRetryDelay = TimeSpan.FromMilliseconds(600); + + var sw = Stopwatch.StartNew(); + await sbs.ReceiveAsync(message, actions, _ => throw new TransientException("Retry again please.")); + sw.Stop(); + + Assert.GreaterOrEqual(sw.ElapsedMilliseconds, 550); + + actions.AssertRenew(1); + actions.AssertAbandon(); + + Assert.IsNotNull(actions.PropertiesModified); + Assert.AreEqual(actions.PropertiesModified!["SubscriberAbandonReason"], "Retry again please."); + } + + [Test] + public async Task MaxDeliveryCount_LessThan() + { + using var test = FunctionTester.Create(); + var actions = test.CreateServiceBusMessageActions(); + var message = test.CreateServiceBusMessage(new { id = "A", name = "B", price = 1.99m }, m => m.Header.DeliveryCount = 2); + + var sbs = test.Services.GetRequiredService(); + sbs.AbandonOnTransient = true; + sbs.MaxDeliveryCount = 3; + + await sbs.ReceiveAsync(message, actions, _ => throw new TransientException("Retry again please.")); + + actions.AssertRenew(0); + actions.AssertAbandon(); + + Assert.IsNotNull(actions.PropertiesModified); + Assert.AreEqual(actions.PropertiesModified!["SubscriberAbandonReason"], "Retry again please."); + } + + [Test] + public async Task MaxDeliveryCount_EqualTo() + { + using var test = FunctionTester.Create(); + var actions = test.CreateServiceBusMessageActions(); + var message = test.CreateServiceBusMessage(new { id = "A", name = "B", price = 1.99m }, m => m.Header.DeliveryCount = 3); + + var sbs = test.Services.GetRequiredService(); + sbs.AbandonOnTransient = true; + sbs.MaxDeliveryCount = 3; + + await sbs.ReceiveAsync(message, actions, _ => throw new TransientException("Retry again please.")); + + actions.AssertRenew(0); + actions.AssertDeadLetter("MaxDeliveryCountExceeded", "Message could not be consumed after 3 attempts (as defined by ServiceBusSubscriber)."); + Assert.IsNotNull(actions.PropertiesModified); + Assert.IsNotNull(actions.PropertiesModified!["SubscriberException"]); + } + + [Test] + public async Task Complete() + { + using var test = FunctionTester.Create(); + var actions = test.CreateServiceBusMessageActions(); + var message = test.CreateServiceBusMessage(new { id = "A", name = "B", price = 1.99m }, m => m.Header.DeliveryCount = 3); + + var sbs = test.Services.GetRequiredService(); + sbs.AbandonOnTransient = true; + + await sbs.ReceiveAsync(message, actions, _ => Task.CompletedTask); + + actions.AssertRenew(0); + actions.AssertComplete(); + } + + [Test] + public async Task Unhandled_DeadLetter() + { + using var test = FunctionTester.Create(); + var actions = test.CreateServiceBusMessageActions(); + var message = test.CreateServiceBusMessage(new { id = "A", name = "B", price = 1.99m }); + + var sbs = test.Services.GetRequiredService(); + sbs.AbandonOnTransient = true; + + await sbs.ReceiveAsync(message, actions, _ => throw new DivideByZeroException("Zero is bad dude!")); + + actions.AssertRenew(0); + actions.AssertDeadLetter("UnhandledError", "Zero is bad dude!"); + } + } +} \ No newline at end of file