From b87737e10906733aaa0d506541534cff057cd9bb Mon Sep 17 00:00:00 2001 From: MrSmoke <709976+MrSmoke@users.noreply.github.com> Date: Tue, 10 Sep 2024 18:50:35 +1000 Subject: [PATCH 1/5] Update RabbitMQ.Client to 7.0.0-rc.9 --- .../Queue/src/BaseQueueHostedService.cs | 12 ++--- ...ClickView.GoodStuff.Queues.RabbitMq.csproj | 2 +- .../src/Internal/RabbitMqCallbackConsumer.cs | 45 +++++++------------ .../src/Internal/RabbitMqClientShims.cs | 30 ------------- src/Queues/RabbitMq/src/MessageContext.cs | 4 +- src/Queues/RabbitMq/src/RabbitMqClient.cs | 44 +++++++++--------- .../RabbitMq/src/RabbitMqClientOptions.cs | 9 ++++ .../RabbitMq/src/SubscriptionContext.cs | 26 +++++------ 8 files changed, 70 insertions(+), 102 deletions(-) delete mode 100644 src/Queues/RabbitMq/src/Internal/RabbitMqClientShims.cs diff --git a/src/Hosting/Queue/src/BaseQueueHostedService.cs b/src/Hosting/Queue/src/BaseQueueHostedService.cs index e24bb09..a7714c5 100644 --- a/src/Hosting/Queue/src/BaseQueueHostedService.cs +++ b/src/Hosting/Queue/src/BaseQueueHostedService.cs @@ -161,7 +161,7 @@ protected abstract Task SubscribeAsync(IQueueClient queueCl /// If true, acknowledge all outstanding delivery tags up to and including the delivery tag /// /// - protected Task AcknowledgeAsync(ulong deliveryTag, bool multiple = false, CancellationToken cancellationToken = default) + protected async Task AcknowledgeAsync(ulong deliveryTag, bool multiple = false, CancellationToken cancellationToken = default) { CheckDisposed(); @@ -169,11 +169,13 @@ protected Task AcknowledgeAsync(ulong deliveryTag, bool multiple = false, Cancel if (subContext is null) throw new InvalidOperationException("Cannot call acknowledge before starting the worker"); - if (subContext.IsOpen) - return subContext.AcknowledgeAsync(deliveryTag, multiple, cancellationToken); + if (!subContext.IsOpen) + { + Logger.AcknowledgeFailureChannelNotOpen(deliveryTag); + return; + } - Logger.AcknowledgeFailureChannelNotOpen(deliveryTag); - return Task.CompletedTask; + await subContext.AcknowledgeAsync(deliveryTag, multiple, cancellationToken); } private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions options, ILoggerFactory loggerFactory) diff --git a/src/Queues/RabbitMq/src/ClickView.GoodStuff.Queues.RabbitMq.csproj b/src/Queues/RabbitMq/src/ClickView.GoodStuff.Queues.RabbitMq.csproj index 643b4bc..faa69f5 100644 --- a/src/Queues/RabbitMq/src/ClickView.GoodStuff.Queues.RabbitMq.csproj +++ b/src/Queues/RabbitMq/src/ClickView.GoodStuff.Queues.RabbitMq.csproj @@ -9,7 +9,7 @@ - + diff --git a/src/Queues/RabbitMq/src/Internal/RabbitMqCallbackConsumer.cs b/src/Queues/RabbitMq/src/Internal/RabbitMqCallbackConsumer.cs index 508a408..c7bb4a9 100644 --- a/src/Queues/RabbitMq/src/Internal/RabbitMqCallbackConsumer.cs +++ b/src/Queues/RabbitMq/src/Internal/RabbitMqCallbackConsumer.cs @@ -4,31 +4,18 @@ namespace ClickView.GoodStuff.Queues.RabbitMq.Internal; using RabbitMQ.Client; using Serialization; -internal class RabbitMqCallbackConsumer : AsyncDefaultBasicConsumer +internal class RabbitMqCallbackConsumer( + SubscriptionContext subscriptionContext, + Func, CancellationToken, Task> callback, + CountWaiter taskWaiter, + IMessageSerializer serializer, + ILogger> logger) + : AsyncDefaultBasicConsumer { - private readonly SubscriptionContext _subscriptionContext; - private readonly Func, CancellationToken, Task> _callback; - private readonly CountWaiter _taskWaiter; - private readonly IMessageSerializer _serializer; - private readonly ILogger> _logger; - - public RabbitMqCallbackConsumer(SubscriptionContext subscriptionContext, - Func, CancellationToken, Task> callback, - CountWaiter taskWaiter, - IMessageSerializer serializer, - ILogger> logger) - { - _subscriptionContext = subscriptionContext; - _callback = callback; - _taskWaiter = taskWaiter; - _serializer = serializer; - _logger = logger; - } - - public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory body) + public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, + string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) { - _logger.QueueMessageReceived(deliveryTag, consumerTag, exchange, redelivered); + logger.QueueMessageReceived(deliveryTag, consumerTag, exchange, redelivered); return HandleBasicDeliverAsync(deliveryTag, body); } @@ -37,7 +24,7 @@ private async Task HandleBasicDeliverAsync(ulong deliveryTag, ReadOnlyMemory(body.Span); + var message = serializer.Deserialize(body.Span); if (message is null) throw new RabbitMqClientException("Failed to deserialize message"); @@ -47,22 +34,22 @@ private async Task HandleBasicDeliverAsync(ulong deliveryTag, ReadOnlyMemoryIf true, acknowledge all outstanding delivery tags up to and including the delivery tag /// The cancellation token /// - public Task AcknowledgeAsync(bool multiple = false, CancellationToken cancellationToken = default) + public ValueTask AcknowledgeAsync(bool multiple = false, CancellationToken cancellationToken = default) { return _subscriptionContext.AcknowledgeAsync( deliveryTag: DeliveryTag, @@ -59,7 +59,7 @@ public Task AcknowledgeAsync(bool multiple = false, CancellationToken cancellati /// If true, requeue the delivery (or multiple deliveries if is true)) with the specified delivery tag /// The cancellation token /// - public Task NegativeAcknowledgeAsync(bool multiple = false, bool requeue = true, CancellationToken cancellationToken = default) + public ValueTask NegativeAcknowledgeAsync(bool multiple = false, bool requeue = true, CancellationToken cancellationToken = default) { return _subscriptionContext.NegativeAcknowledgeAsync( deliveryTag: DeliveryTag, diff --git a/src/Queues/RabbitMq/src/RabbitMqClient.cs b/src/Queues/RabbitMq/src/RabbitMqClient.cs index db542bb..c93cfc5 100644 --- a/src/Queues/RabbitMq/src/RabbitMqClient.cs +++ b/src/Queues/RabbitMq/src/RabbitMqClient.cs @@ -48,10 +48,15 @@ public async Task EnqueueAsync(string exchange, TData data, EnqueueOption using var channel = await GetChannelAsync(cancellationToken); - var properties = channel.CreateBasicProperties(); - properties.Persistent = options.Persistent; - - channel.BasicPublish(exchange, options.RoutingKey, properties, bytes); + var properties = new BasicProperties {Persistent = options.Persistent}; + + await channel.BasicPublishAsync( + exchange: exchange, + routingKey: options.RoutingKey, + mandatory: true, + basicProperties: properties, + body: bytes, + cancellationToken: cancellationToken); } /// @@ -91,12 +96,13 @@ public async Task SubscribeAsync(string queue, _options.LoggerFactory.CreateLogger>() ); - channel.BasicQos(0, options.PrefetchCount, false); + await channel.BasicQosAsync(0, options.PrefetchCount, false, cancellationToken); - var consumerTag = channel.BasicConsume( + var consumerTag = await channel.BasicConsumeAsync( queue: queue, autoAck: options.AutoAcknowledge, - consumer: consumer); + consumer: consumer, + cancellationToken: cancellationToken); subContext.SetConsumerTag(consumerTag); @@ -161,22 +167,16 @@ private async ValueTask ConnectSlowAsync(CancellationToken token) _logger.ConnectingToRabbitMq(); // Create a new connection - var connection = _connectionFactory.CreateConnection(); + var connection = await _connectionFactory.CreateConnectionAsync(token); // Setup logging - connection.CallbackException += (_, args) => - _logger.LogError(args.Exception, "Exception thrown in RabbitMQ callback"); + connection.CallbackException += (_, args) => _logger.LogError(args.Exception, "Exception thrown in RabbitMQ callback"); connection.ConnectionBlocked += (_, _) => _logger.LogDebug("RabbitMQ connection blocked"); connection.ConnectionUnblocked += (_, _) => _logger.LogDebug("RabbitMQ connection unblocked"); connection.ConnectionShutdown += (_, _) => _logger.LogDebug("RabbitMQ connection shutdown"); - - if (connection is IAutorecoveringConnection autorecoveringConnection) - { - autorecoveringConnection.RecoveringConsumer += (_, _) => _logger.LogDebug("RecoveringConsumer"); - autorecoveringConnection.RecoverySucceeded += (_, _) => _logger.LogDebug("RecoverySucceeded"); - autorecoveringConnection.ConnectionRecoveryError += (_, args) => - _logger.LogError(args.Exception, "ConnectionRecoveryError"); - } + connection.RecoveringConsumer += (_, _) => _logger.LogDebug("RecoveringConsumer"); + connection.RecoverySucceeded += (_, _) => _logger.LogDebug("RecoverySucceeded"); + connection.ConnectionRecoveryError += (_, args) => _logger.LogError(args.Exception, "ConnectionRecoveryError"); _connection = connection; @@ -190,10 +190,10 @@ private async ValueTask ConnectSlowAsync(CancellationToken token) } } - private async ValueTask GetChannelAsync(CancellationToken cancellationToken = default) + private async ValueTask GetChannelAsync(CancellationToken cancellationToken = default) { var connection = await GetConnectionAsync(cancellationToken); - return connection.CreateModel(); + return await connection.CreateChannelAsync(cancellationToken); } private void CheckDisposed() @@ -208,8 +208,8 @@ private static ConnectionFactory CreateConnectionFactory(RabbitMqClientOptions o { HostName = options.Host, Port = options.Port, - DispatchConsumersAsync = true, - AutomaticRecoveryEnabled = true + AutomaticRecoveryEnabled = true, + ConsumerDispatchConcurrency = options.ConsumerDispatchConcurrency }; // Username diff --git a/src/Queues/RabbitMq/src/RabbitMqClientOptions.cs b/src/Queues/RabbitMq/src/RabbitMqClientOptions.cs index fc378e6..2336d66 100644 --- a/src/Queues/RabbitMq/src/RabbitMqClientOptions.cs +++ b/src/Queues/RabbitMq/src/RabbitMqClientOptions.cs @@ -58,5 +58,14 @@ public class RabbitMqClientOptions : IOptions /// public ILoggerFactory LoggerFactory { get; set; } = NullLoggerFactory.Instance; + /// + /// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one, + /// tasks will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading. + /// Defaults to 1. + /// + /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. + /// In addition to that consumers need to be thread/concurrency safe. + public int ConsumerDispatchConcurrency { get; set; } = 1; + public RabbitMqClientOptions Value => this; } diff --git a/src/Queues/RabbitMq/src/SubscriptionContext.cs b/src/Queues/RabbitMq/src/SubscriptionContext.cs index ab7c9c6..4bab138 100644 --- a/src/Queues/RabbitMq/src/SubscriptionContext.cs +++ b/src/Queues/RabbitMq/src/SubscriptionContext.cs @@ -1,5 +1,6 @@ namespace ClickView.GoodStuff.Queues.RabbitMq; +using System.Diagnostics; using Internal; using Microsoft.Extensions.Logging; using RabbitMQ.Client; @@ -7,7 +8,7 @@ namespace ClickView.GoodStuff.Queues.RabbitMq; public class SubscriptionContext : IAsyncDisposable { private readonly string _queueName; - private readonly IModel _channel; + private readonly IChannel _channel; private readonly ActiveSubscriptions _subscriptions; private readonly CountWaiter _taskWaiter; private readonly ILogger _logger; @@ -16,7 +17,7 @@ public class SubscriptionContext : IAsyncDisposable internal SubscriptionContext( string queueName, - IModel channel, + IChannel channel, ActiveSubscriptions subscriptions, CountWaiter taskWaiter, ILogger logger) @@ -42,18 +43,17 @@ internal SubscriptionContext( /// If true, acknowledge all outstanding delivery tags up to and including the delivery tag /// The cancellation token /// - public Task AcknowledgeAsync(ulong deliveryTag, bool multiple = false, + public ValueTask AcknowledgeAsync(ulong deliveryTag, bool multiple = false, CancellationToken cancellationToken = default) { CheckDisposed(); _logger.SendingAcknowledge(deliveryTag); - _channel.BasicAck( + return _channel.BasicAckAsync( deliveryTag: deliveryTag, - multiple: multiple); - - return Task.CompletedTask; + multiple: multiple, + cancellationToken: cancellationToken); } /// @@ -64,19 +64,18 @@ public Task AcknowledgeAsync(ulong deliveryTag, bool multiple = false, /// If true, requeue the delivery (or multiple deliveries if is true)) with the specified delivery tag /// The cancellation token /// - public Task NegativeAcknowledgeAsync(ulong deliveryTag, bool multiple = false, bool requeue = true, + public ValueTask NegativeAcknowledgeAsync(ulong deliveryTag, bool multiple = false, bool requeue = true, CancellationToken cancellationToken = default) { CheckDisposed(); _logger.SendingNegativeAcknowledge(deliveryTag); - _channel.BasicNack( + return _channel.BasicNackAsync( deliveryTag: deliveryTag, multiple: multiple, - requeue: requeue); - - return Task.CompletedTask; + requeue: requeue, + cancellationToken: cancellationToken); } /// @@ -89,7 +88,8 @@ public async ValueTask DisposeAsync() // Unsubscribe from the queue to stop receiving any new messages _logger.LogDebug("Unsubscribing from queue {QueueName}", _queueName); - _channel.BasicCancel(_consumerTag); + Debug.Assert(_consumerTag != null); + await _channel.BasicCancelAsync(_consumerTag); // Wait for all tasks to complete before closing the connection // If we close the connection first then the tasks cant ack From dc91317165fb75a9122969693cd36deda78bc8ab Mon Sep 17 00:00:00 2001 From: MrSmoke <709976+MrSmoke@users.noreply.github.com> Date: Tue, 10 Sep 2024 18:50:43 +1000 Subject: [PATCH 2/5] Small cleanups --- src/Queues/RabbitMq/src/Internal/CountWaiter.cs | 2 +- src/Queues/RabbitMq/src/MessageWrapper.cs | 15 ++++----------- src/Queues/RabbitMq/src/RabbitMqClient.cs | 2 +- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/Queues/RabbitMq/src/Internal/CountWaiter.cs b/src/Queues/RabbitMq/src/Internal/CountWaiter.cs index 2a53032..9700fcd 100644 --- a/src/Queues/RabbitMq/src/Internal/CountWaiter.cs +++ b/src/Queues/RabbitMq/src/Internal/CountWaiter.cs @@ -6,7 +6,7 @@ namespace ClickView.GoodStuff.Queues.RabbitMq.Internal; internal class CountWaiter { private readonly object _pendingLock = new(); - private List _pendingAwaiters = new(); + private readonly List _pendingAwaiters = new(); private int _count; public Task WaitAsync() diff --git a/src/Queues/RabbitMq/src/MessageWrapper.cs b/src/Queues/RabbitMq/src/MessageWrapper.cs index e8f8e64..5302016 100644 --- a/src/Queues/RabbitMq/src/MessageWrapper.cs +++ b/src/Queues/RabbitMq/src/MessageWrapper.cs @@ -1,17 +1,10 @@ namespace ClickView.GoodStuff.Queues.RabbitMq; -public class MessageWrapper +public class MessageWrapper(string id, TData data, long timestamp) { - public MessageWrapper(string id, TData data, long timestamp) - { - Id = id; - Data = data; - Timestamp = timestamp; - } - - public string Id { get; init; } - public TData Data { get; init; } - public long Timestamp { get; init; } + public string Id { get; init; } = id; + public TData Data { get; init; } = data; + public long Timestamp { get; init; } = timestamp; internal static MessageWrapper New(TData data) => new ( diff --git a/src/Queues/RabbitMq/src/RabbitMqClient.cs b/src/Queues/RabbitMq/src/RabbitMqClient.cs index c93cfc5..aaeceb4 100644 --- a/src/Queues/RabbitMq/src/RabbitMqClient.cs +++ b/src/Queues/RabbitMq/src/RabbitMqClient.cs @@ -190,7 +190,7 @@ private async ValueTask ConnectSlowAsync(CancellationToken token) } } - private async ValueTask GetChannelAsync(CancellationToken cancellationToken = default) + private async Task GetChannelAsync(CancellationToken cancellationToken = default) { var connection = await GetConnectionAsync(cancellationToken); return await connection.CreateChannelAsync(cancellationToken); From 3478b888ca11019d655a69c6cef1216b26529e55 Mon Sep 17 00:00:00 2001 From: MrSmoke <709976+MrSmoke@users.noreply.github.com> Date: Thu, 7 Nov 2024 10:53:20 +1100 Subject: [PATCH 3/5] Update RabbitMQ client to release version --- .../RabbitMq/src/ClickView.GoodStuff.Queues.RabbitMq.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Queues/RabbitMq/src/ClickView.GoodStuff.Queues.RabbitMq.csproj b/src/Queues/RabbitMq/src/ClickView.GoodStuff.Queues.RabbitMq.csproj index faa69f5..9d8e610 100644 --- a/src/Queues/RabbitMq/src/ClickView.GoodStuff.Queues.RabbitMq.csproj +++ b/src/Queues/RabbitMq/src/ClickView.GoodStuff.Queues.RabbitMq.csproj @@ -9,7 +9,7 @@ - + From 96b6057fa3f8597a9d5b9c257476c9967d52dcbe Mon Sep 17 00:00:00 2001 From: MrSmoke <709976+MrSmoke@users.noreply.github.com> Date: Thu, 7 Nov 2024 11:29:23 +1100 Subject: [PATCH 4/5] Changes required for rabbitmq 7.0.0 --- .../RabbitMq/src/Internal/ConnectionLogger.cs | 81 +++++++++++++++++++ .../src/Internal/RabbitMqCallbackConsumer.cs | 12 +-- src/Queues/RabbitMq/src/RabbitMqClient.cs | 13 +-- .../RabbitMq/src/RabbitMqClientOptions.cs | 2 +- 4 files changed, 93 insertions(+), 15 deletions(-) create mode 100644 src/Queues/RabbitMq/src/Internal/ConnectionLogger.cs diff --git a/src/Queues/RabbitMq/src/Internal/ConnectionLogger.cs b/src/Queues/RabbitMq/src/Internal/ConnectionLogger.cs new file mode 100644 index 0000000..b49ed74 --- /dev/null +++ b/src/Queues/RabbitMq/src/Internal/ConnectionLogger.cs @@ -0,0 +1,81 @@ +namespace ClickView.GoodStuff.Queues.RabbitMq.Internal; + +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +internal class ConnectionLogger +{ + private readonly ILogger _logger; + + public ConnectionLogger(IConnection connection, ILogger logger) + { + _logger = logger; + + connection.CallbackExceptionAsync += CallbackExceptionAsync; + connection.ConnectionShutdownAsync += ConnectionOnConnectionShutdownAsync; + connection.RecoverySucceededAsync += ConnectionOnRecoverySucceededAsync; + connection.ConnectionRecoveryErrorAsync += ConnectionOnConnectionRecoveryErrorAsync; + connection.ConsumerTagChangeAfterRecoveryAsync += ConnectionOnConsumerTagChangeAfterRecoveryAsync; + connection.QueueNameChangedAfterRecoveryAsync += ConnectionOnQueueNameChangedAfterRecoveryAsync; + connection.RecoveringConsumerAsync += ConnectionOnRecoveringConsumerAsync; + connection.ConnectionBlockedAsync += ConnectionOnConnectionBlockedAsync; + connection.ConnectionUnblockedAsync += ConnectionOnConnectionUnblockedAsync; + } + + private Task CallbackExceptionAsync(object sender, CallbackExceptionEventArgs args) + { + _logger.LogError(args.Exception, "Exception thrown in RabbitMQ callback"); + return Task.CompletedTask; + } + + private Task ConnectionOnConnectionShutdownAsync(object sender, ShutdownEventArgs args) + { + _logger.LogDebug("RabbitMQ connection shutdown"); + return Task.CompletedTask; + } + + private Task ConnectionOnRecoverySucceededAsync(object sender, AsyncEventArgs args) + { + _logger.LogDebug("RabbitMQ connection recovery succeeded"); + return Task.CompletedTask; + } + + private Task ConnectionOnConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs args) + { + _logger.LogDebug("RabbitMQ connection recovery error"); + return Task.CompletedTask; + } + + private Task ConnectionOnConsumerTagChangeAfterRecoveryAsync(object sender, + ConsumerTagChangedAfterRecoveryEventArgs args) + { + _logger.LogDebug("RabbitMQ connection consumer tag changed after recovery"); + return Task.CompletedTask; + } + + private Task ConnectionOnQueueNameChangedAfterRecoveryAsync(object sender, + QueueNameChangedAfterRecoveryEventArgs args) + { + _logger.LogDebug("RabbitMQ connection queue name changed after recovery"); + return Task.CompletedTask; + } + + private Task ConnectionOnRecoveringConsumerAsync(object sender, RecoveringConsumerEventArgs args) + { + _logger.LogDebug("RabbitMQ connection recovering consumer"); + return Task.CompletedTask; + } + + private Task ConnectionOnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs args) + { + _logger.LogDebug("RabbitMQ connection blocked"); + return Task.CompletedTask; + } + + private Task ConnectionOnConnectionUnblockedAsync(object sender, AsyncEventArgs args) + { + _logger.LogDebug("RabbitMQ connection unblocked"); + return Task.CompletedTask; + } +} diff --git a/src/Queues/RabbitMq/src/Internal/RabbitMqCallbackConsumer.cs b/src/Queues/RabbitMq/src/Internal/RabbitMqCallbackConsumer.cs index c7bb4a9..839b324 100644 --- a/src/Queues/RabbitMq/src/Internal/RabbitMqCallbackConsumer.cs +++ b/src/Queues/RabbitMq/src/Internal/RabbitMqCallbackConsumer.cs @@ -5,22 +5,24 @@ namespace ClickView.GoodStuff.Queues.RabbitMq.Internal; using Serialization; internal class RabbitMqCallbackConsumer( + IChannel channel, SubscriptionContext subscriptionContext, Func, CancellationToken, Task> callback, CountWaiter taskWaiter, IMessageSerializer serializer, ILogger> logger) - : AsyncDefaultBasicConsumer + : AsyncDefaultBasicConsumer(channel) { public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered, - string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body) + string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory body, + CancellationToken cancellationToken = default) { logger.QueueMessageReceived(deliveryTag, consumerTag, exchange, redelivered); - return HandleBasicDeliverAsync(deliveryTag, body); + return HandleBasicDeliverAsync(deliveryTag, body, cancellationToken); } - private async Task HandleBasicDeliverAsync(ulong deliveryTag, ReadOnlyMemory body) + private async Task HandleBasicDeliverAsync(ulong deliveryTag, ReadOnlyMemory body, CancellationToken cancellationToken) { try { @@ -40,7 +42,7 @@ private async Task HandleBasicDeliverAsync(ulong deliveryTag, ReadOnlyMemory(string exchange, TData data, EnqueueOption var message = MessageWrapper.New(data); var bytes = _options.Serializer.Serialize(message); - using var channel = await GetChannelAsync(cancellationToken); + await using var channel = await GetChannelAsync(cancellationToken); var properties = new BasicProperties {Persistent = options.Persistent}; @@ -89,6 +89,7 @@ public async Task SubscribeAsync(string queue, logger: _options.LoggerFactory.CreateLogger()); var consumer = new RabbitMqCallbackConsumer( + channel, subContext, callback, shutdownTaskWaiter, @@ -170,13 +171,7 @@ private async ValueTask ConnectSlowAsync(CancellationToken token) var connection = await _connectionFactory.CreateConnectionAsync(token); // Setup logging - connection.CallbackException += (_, args) => _logger.LogError(args.Exception, "Exception thrown in RabbitMQ callback"); - connection.ConnectionBlocked += (_, _) => _logger.LogDebug("RabbitMQ connection blocked"); - connection.ConnectionUnblocked += (_, _) => _logger.LogDebug("RabbitMQ connection unblocked"); - connection.ConnectionShutdown += (_, _) => _logger.LogDebug("RabbitMQ connection shutdown"); - connection.RecoveringConsumer += (_, _) => _logger.LogDebug("RecoveringConsumer"); - connection.RecoverySucceeded += (_, _) => _logger.LogDebug("RecoverySucceeded"); - connection.ConnectionRecoveryError += (_, args) => _logger.LogError(args.Exception, "ConnectionRecoveryError"); + _ = new ConnectionLogger(connection, _logger); _connection = connection; @@ -193,7 +188,7 @@ private async ValueTask ConnectSlowAsync(CancellationToken token) private async Task GetChannelAsync(CancellationToken cancellationToken = default) { var connection = await GetConnectionAsync(cancellationToken); - return await connection.CreateChannelAsync(cancellationToken); + return await connection.CreateChannelAsync(cancellationToken: cancellationToken); } private void CheckDisposed() diff --git a/src/Queues/RabbitMq/src/RabbitMqClientOptions.cs b/src/Queues/RabbitMq/src/RabbitMqClientOptions.cs index 2336d66..7911335 100644 --- a/src/Queues/RabbitMq/src/RabbitMqClientOptions.cs +++ b/src/Queues/RabbitMq/src/RabbitMqClientOptions.cs @@ -65,7 +65,7 @@ public class RabbitMqClientOptions : IOptions /// /// For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them. /// In addition to that consumers need to be thread/concurrency safe. - public int ConsumerDispatchConcurrency { get; set; } = 1; + public ushort ConsumerDispatchConcurrency { get; set; } = 1; public RabbitMqClientOptions Value => this; } From d3860d31be1c580e3a36ffcc5aec54bf0cfb393c Mon Sep 17 00:00:00 2001 From: MrSmoke <709976+MrSmoke@users.noreply.github.com> Date: Tue, 26 Nov 2024 16:20:10 +1100 Subject: [PATCH 5/5] Update BaseQueueHostedService.AcknowledgeAsync to return ValueTask --- src/Hosting/Queue/src/BaseQueueHostedService.cs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/Hosting/Queue/src/BaseQueueHostedService.cs b/src/Hosting/Queue/src/BaseQueueHostedService.cs index a7714c5..ef8ce5d 100644 --- a/src/Hosting/Queue/src/BaseQueueHostedService.cs +++ b/src/Hosting/Queue/src/BaseQueueHostedService.cs @@ -161,7 +161,7 @@ protected abstract Task SubscribeAsync(IQueueClient queueCl /// If true, acknowledge all outstanding delivery tags up to and including the delivery tag /// /// - protected async Task AcknowledgeAsync(ulong deliveryTag, bool multiple = false, CancellationToken cancellationToken = default) + protected ValueTask AcknowledgeAsync(ulong deliveryTag, bool multiple = false, CancellationToken cancellationToken = default) { CheckDisposed(); @@ -169,13 +169,11 @@ protected async Task AcknowledgeAsync(ulong deliveryTag, bool multiple = false, if (subContext is null) throw new InvalidOperationException("Cannot call acknowledge before starting the worker"); - if (!subContext.IsOpen) - { - Logger.AcknowledgeFailureChannelNotOpen(deliveryTag); - return; - } + if (subContext.IsOpen) + return subContext.AcknowledgeAsync(deliveryTag, multiple, cancellationToken); - await subContext.AcknowledgeAsync(deliveryTag, multiple, cancellationToken); + Logger.AcknowledgeFailureChannelNotOpen(deliveryTag); + return ValueTask.CompletedTask; } private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions options, ILoggerFactory loggerFactory)