Skip to content

Commit

Permalink
Merge pull request #218 from clickviewapp/rabbitmq-7
Browse files Browse the repository at this point in the history
Rabbitmq 7.0.0
  • Loading branch information
Prads- authored Nov 28, 2024
2 parents 85a0f15 + d3860d3 commit 49f9274
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 116 deletions.
4 changes: 2 additions & 2 deletions src/Hosting/Queue/src/BaseQueueHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected abstract Task<SubscriptionContext> SubscribeAsync(IQueueClient queueCl
/// <param name="multiple">If true, acknowledge all outstanding delivery tags up to and including the delivery tag</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
protected Task AcknowledgeAsync(ulong deliveryTag, bool multiple = false, CancellationToken cancellationToken = default)
protected ValueTask AcknowledgeAsync(ulong deliveryTag, bool multiple = false, CancellationToken cancellationToken = default)
{
CheckDisposed();

Expand All @@ -173,7 +173,7 @@ protected Task AcknowledgeAsync(ulong deliveryTag, bool multiple = false, Cancel
return subContext.AcknowledgeAsync(deliveryTag, multiple, cancellationToken);

Logger.AcknowledgeFailureChannelNotOpen(deliveryTag);
return Task.CompletedTask;
return ValueTask.CompletedTask;
}

private static RabbitMqClientOptions CreateOptions(BaseQueueHostedServiceOptions options, ILoggerFactory loggerFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="9.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
<PackageReference Include="RabbitMQ.Client" Version="7.0.0" />
<PackageReference Include="System.Text.Json" Version="9.0.0" />
</ItemGroup>

Expand Down
81 changes: 81 additions & 0 deletions src/Queues/RabbitMq/src/Internal/ConnectionLogger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
namespace ClickView.GoodStuff.Queues.RabbitMq.Internal;

using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

internal class ConnectionLogger
{
private readonly ILogger _logger;

public ConnectionLogger(IConnection connection, ILogger logger)
{
_logger = logger;

connection.CallbackExceptionAsync += CallbackExceptionAsync;
connection.ConnectionShutdownAsync += ConnectionOnConnectionShutdownAsync;
connection.RecoverySucceededAsync += ConnectionOnRecoverySucceededAsync;
connection.ConnectionRecoveryErrorAsync += ConnectionOnConnectionRecoveryErrorAsync;
connection.ConsumerTagChangeAfterRecoveryAsync += ConnectionOnConsumerTagChangeAfterRecoveryAsync;
connection.QueueNameChangedAfterRecoveryAsync += ConnectionOnQueueNameChangedAfterRecoveryAsync;
connection.RecoveringConsumerAsync += ConnectionOnRecoveringConsumerAsync;
connection.ConnectionBlockedAsync += ConnectionOnConnectionBlockedAsync;
connection.ConnectionUnblockedAsync += ConnectionOnConnectionUnblockedAsync;
}

private Task CallbackExceptionAsync(object sender, CallbackExceptionEventArgs args)
{
_logger.LogError(args.Exception, "Exception thrown in RabbitMQ callback");
return Task.CompletedTask;
}

private Task ConnectionOnConnectionShutdownAsync(object sender, ShutdownEventArgs args)
{
_logger.LogDebug("RabbitMQ connection shutdown");
return Task.CompletedTask;
}

private Task ConnectionOnRecoverySucceededAsync(object sender, AsyncEventArgs args)
{
_logger.LogDebug("RabbitMQ connection recovery succeeded");
return Task.CompletedTask;
}

private Task ConnectionOnConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs args)
{
_logger.LogDebug("RabbitMQ connection recovery error");
return Task.CompletedTask;
}

private Task ConnectionOnConsumerTagChangeAfterRecoveryAsync(object sender,
ConsumerTagChangedAfterRecoveryEventArgs args)
{
_logger.LogDebug("RabbitMQ connection consumer tag changed after recovery");
return Task.CompletedTask;
}

private Task ConnectionOnQueueNameChangedAfterRecoveryAsync(object sender,
QueueNameChangedAfterRecoveryEventArgs args)
{
_logger.LogDebug("RabbitMQ connection queue name changed after recovery");
return Task.CompletedTask;
}

private Task ConnectionOnRecoveringConsumerAsync(object sender, RecoveringConsumerEventArgs args)
{
_logger.LogDebug("RabbitMQ connection recovering consumer");
return Task.CompletedTask;
}

private Task ConnectionOnConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs args)
{
_logger.LogDebug("RabbitMQ connection blocked");
return Task.CompletedTask;
}

private Task ConnectionOnConnectionUnblockedAsync(object sender, AsyncEventArgs args)
{
_logger.LogDebug("RabbitMQ connection unblocked");
return Task.CompletedTask;
}
}
2 changes: 1 addition & 1 deletion src/Queues/RabbitMq/src/Internal/CountWaiter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace ClickView.GoodStuff.Queues.RabbitMq.Internal;
internal class CountWaiter
{
private readonly object _pendingLock = new();
private List<TaskCompletionSource> _pendingAwaiters = new();
private readonly List<TaskCompletionSource> _pendingAwaiters = new();
private int _count;

public Task WaitAsync()
Expand Down
51 changes: 20 additions & 31 deletions src/Queues/RabbitMq/src/Internal/RabbitMqCallbackConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,29 @@ namespace ClickView.GoodStuff.Queues.RabbitMq.Internal;
using RabbitMQ.Client;
using Serialization;

internal class RabbitMqCallbackConsumer<TData> : AsyncDefaultBasicConsumer
internal class RabbitMqCallbackConsumer<TData>(
IChannel channel,
SubscriptionContext subscriptionContext,
Func<MessageContext<TData>, CancellationToken, Task> callback,
CountWaiter taskWaiter,
IMessageSerializer serializer,
ILogger<RabbitMqCallbackConsumer<TData>> logger)
: AsyncDefaultBasicConsumer(channel)
{
private readonly SubscriptionContext _subscriptionContext;
private readonly Func<MessageContext<TData>, CancellationToken, Task> _callback;
private readonly CountWaiter _taskWaiter;
private readonly IMessageSerializer _serializer;
private readonly ILogger<RabbitMqCallbackConsumer<TData>> _logger;

public RabbitMqCallbackConsumer(SubscriptionContext subscriptionContext,
Func<MessageContext<TData>, CancellationToken, Task> callback,
CountWaiter taskWaiter,
IMessageSerializer serializer,
ILogger<RabbitMqCallbackConsumer<TData>> logger)
{
_subscriptionContext = subscriptionContext;
_callback = callback;
_taskWaiter = taskWaiter;
_serializer = serializer;
_logger = logger;
}

public override Task HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IBasicProperties properties, ReadOnlyMemory<byte> body)
public override Task HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag, bool redelivered,
string exchange, string routingKey, IReadOnlyBasicProperties properties, ReadOnlyMemory<byte> body,
CancellationToken cancellationToken = default)
{
_logger.QueueMessageReceived(deliveryTag, consumerTag, exchange, redelivered);
logger.QueueMessageReceived(deliveryTag, consumerTag, exchange, redelivered);

return HandleBasicDeliverAsync(deliveryTag, body);
return HandleBasicDeliverAsync(deliveryTag, body, cancellationToken);
}

private async Task HandleBasicDeliverAsync(ulong deliveryTag, ReadOnlyMemory<byte> body)
private async Task HandleBasicDeliverAsync(ulong deliveryTag, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
{
try
{
var message = _serializer.Deserialize<TData>(body.Span);
var message = serializer.Deserialize<TData>(body.Span);

if (message is null)
throw new RabbitMqClientException("Failed to deserialize message");
Expand All @@ -47,22 +36,22 @@ private async Task HandleBasicDeliverAsync(ulong deliveryTag, ReadOnlyMemory<byt
deliveryTag: deliveryTag,
timestamp: DateTimeOffset.FromUnixTimeSeconds(message.Timestamp).UtcDateTime,
id: message.Id,
subscriptionContext: _subscriptionContext
subscriptionContext: subscriptionContext
);

_taskWaiter.Increment();
taskWaiter.Increment();
try
{
await _callback(context, CancellationToken.None);
await callback(context, cancellationToken);
}
finally
{
_taskWaiter.Decrement();
taskWaiter.Decrement();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Unhandled exception when processing queue message");
logger.LogError(ex, "Unhandled exception when processing queue message");
throw;
}
}
Expand Down
30 changes: 0 additions & 30 deletions src/Queues/RabbitMq/src/Internal/RabbitMqClientShims.cs

This file was deleted.

4 changes: 2 additions & 2 deletions src/Queues/RabbitMq/src/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ internal MessageContext(TData data, ulong deliveryTag, DateTime timestamp, strin
/// <param name="multiple">If true, acknowledge all outstanding delivery tags up to and including the delivery tag</param>
/// <param name="cancellationToken">The cancellation token</param>
/// <returns></returns>
public Task AcknowledgeAsync(bool multiple = false, CancellationToken cancellationToken = default)
public ValueTask AcknowledgeAsync(bool multiple = false, CancellationToken cancellationToken = default)
{
return _subscriptionContext.AcknowledgeAsync(
deliveryTag: DeliveryTag,
Expand All @@ -59,7 +59,7 @@ public Task AcknowledgeAsync(bool multiple = false, CancellationToken cancellati
/// <param name="requeue">If true, requeue the delivery (or multiple deliveries if <paramref name="multiple"/> is true)) with the specified delivery tag</param>
/// <param name="cancellationToken">The cancellation token</param>
/// <returns></returns>
public Task NegativeAcknowledgeAsync(bool multiple = false, bool requeue = true, CancellationToken cancellationToken = default)
public ValueTask NegativeAcknowledgeAsync(bool multiple = false, bool requeue = true, CancellationToken cancellationToken = default)
{
return _subscriptionContext.NegativeAcknowledgeAsync(
deliveryTag: DeliveryTag,
Expand Down
15 changes: 4 additions & 11 deletions src/Queues/RabbitMq/src/MessageWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
namespace ClickView.GoodStuff.Queues.RabbitMq;

public class MessageWrapper<TData>
public class MessageWrapper<TData>(string id, TData data, long timestamp)
{
public MessageWrapper(string id, TData data, long timestamp)
{
Id = id;
Data = data;
Timestamp = timestamp;
}

public string Id { get; init; }
public TData Data { get; init; }
public long Timestamp { get; init; }
public string Id { get; init; } = id;
public TData Data { get; init; } = data;
public long Timestamp { get; init; } = timestamp;

internal static MessageWrapper<TData> New(TData data) => new
(
Expand Down
45 changes: 20 additions & 25 deletions src/Queues/RabbitMq/src/RabbitMqClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ public async Task EnqueueAsync<TData>(string exchange, TData data, EnqueueOption
var message = MessageWrapper<TData>.New(data);
var bytes = _options.Serializer.Serialize(message);

using var channel = await GetChannelAsync(cancellationToken);
await using var channel = await GetChannelAsync(cancellationToken);

var properties = channel.CreateBasicProperties();
properties.Persistent = options.Persistent;
var properties = new BasicProperties {Persistent = options.Persistent};

channel.BasicPublish(exchange, options.RoutingKey, properties, bytes);
await channel.BasicPublishAsync(
exchange: exchange,
routingKey: options.RoutingKey,
mandatory: true,
basicProperties: properties,
body: bytes,
cancellationToken: cancellationToken);
}

/// <inheritdoc />
Expand Down Expand Up @@ -84,19 +89,21 @@ public async Task<SubscriptionContext> SubscribeAsync<TData>(string queue,
logger: _options.LoggerFactory.CreateLogger<SubscriptionContext>());

var consumer = new RabbitMqCallbackConsumer<TData>(
channel,
subContext,
callback,
shutdownTaskWaiter,
_options.Serializer,
_options.LoggerFactory.CreateLogger<RabbitMqCallbackConsumer<TData>>()
);

channel.BasicQos(0, options.PrefetchCount, false);
await channel.BasicQosAsync(0, options.PrefetchCount, false, cancellationToken);

var consumerTag = channel.BasicConsume(
var consumerTag = await channel.BasicConsumeAsync(
queue: queue,
autoAck: options.AutoAcknowledge,
consumer: consumer);
consumer: consumer,
cancellationToken: cancellationToken);

subContext.SetConsumerTag(consumerTag);

Expand Down Expand Up @@ -161,22 +168,10 @@ private async ValueTask<IConnection> ConnectSlowAsync(CancellationToken token)
_logger.ConnectingToRabbitMq();

// Create a new connection
var connection = _connectionFactory.CreateConnection();
var connection = await _connectionFactory.CreateConnectionAsync(token);

// Setup logging
connection.CallbackException += (_, args) =>
_logger.LogError(args.Exception, "Exception thrown in RabbitMQ callback");
connection.ConnectionBlocked += (_, _) => _logger.LogDebug("RabbitMQ connection blocked");
connection.ConnectionUnblocked += (_, _) => _logger.LogDebug("RabbitMQ connection unblocked");
connection.ConnectionShutdown += (_, _) => _logger.LogDebug("RabbitMQ connection shutdown");

if (connection is IAutorecoveringConnection autorecoveringConnection)
{
autorecoveringConnection.RecoveringConsumer += (_, _) => _logger.LogDebug("RecoveringConsumer");
autorecoveringConnection.RecoverySucceeded += (_, _) => _logger.LogDebug("RecoverySucceeded");
autorecoveringConnection.ConnectionRecoveryError += (_, args) =>
_logger.LogError(args.Exception, "ConnectionRecoveryError");
}
_ = new ConnectionLogger(connection, _logger);

_connection = connection;

Expand All @@ -190,10 +185,10 @@ private async ValueTask<IConnection> ConnectSlowAsync(CancellationToken token)
}
}

private async ValueTask<IModel> GetChannelAsync(CancellationToken cancellationToken = default)
private async Task<IChannel> GetChannelAsync(CancellationToken cancellationToken = default)
{
var connection = await GetConnectionAsync(cancellationToken);
return connection.CreateModel();
return await connection.CreateChannelAsync(cancellationToken: cancellationToken);
}

private void CheckDisposed()
Expand All @@ -208,8 +203,8 @@ private static ConnectionFactory CreateConnectionFactory(RabbitMqClientOptions o
{
HostName = options.Host,
Port = options.Port,
DispatchConsumersAsync = true,
AutomaticRecoveryEnabled = true
AutomaticRecoveryEnabled = true,
ConsumerDispatchConcurrency = options.ConsumerDispatchConcurrency
};

// Username
Expand Down
9 changes: 9 additions & 0 deletions src/Queues/RabbitMq/src/RabbitMqClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,14 @@ public class RabbitMqClientOptions : IOptions<RabbitMqClientOptions>
/// </summary>
public ILoggerFactory LoggerFactory { get; set; } = NullLoggerFactory.Instance;

/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one,
/// tasks will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
/// Defaults to 1.
/// </summary>
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
public ushort ConsumerDispatchConcurrency { get; set; } = 1;

public RabbitMqClientOptions Value => this;
}
Loading

0 comments on commit 49f9274

Please sign in to comment.