Skip to content

Commit

Permalink
BrighterCommandGH-3386 Fixes unit test & confirm publish
Browse files Browse the repository at this point in the history
  • Loading branch information
lillo42 committed Dec 4, 2024
1 parent bdd5376 commit 407091e
Show file tree
Hide file tree
Showing 12 changed files with 65 additions and 54 deletions.
24 changes: 15 additions & 9 deletions src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageGateway.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,37 +122,43 @@ private void ConnectWithRetry(ChannelName queueName, OnMissingChannel makeExchan
new Dictionary<string, object> { { "queueName", queueName.Value } });
}

protected virtual async Task ConnectToBroker(OnMissingChannel makeExchange,
protected virtual void ConnectToBroker(OnMissingChannel makeExchange) =>
ConnectToBrokerAsync(makeExchange).GetAwaiter().GetResult();

protected virtual async Task ConnectToBrokerAsync(OnMissingChannel makeExchange,
CancellationToken cancellationToken = default)
{
if (Channel == null || Channel.IsClosed)
{
var connection =
new RmqMessageGatewayConnectionPool(Connection.Name, Connection.Heartbeat).GetConnection(
_connectionFactory);
var connection = await new RmqMessageGatewayConnectionPool(Connection.Name, Connection.Heartbeat)
.GetConnectionAsync(_connectionFactory, cancellationToken);

connection.ConnectionBlockedAsync += HandleBlocked;
connection.ConnectionUnblockedAsync += HandleUnBlocked;
connection.ConnectionBlockedAsync += HandleBlockedAsync;
connection.ConnectionUnblockedAsync += HandleUnBlockedAsync;

s_logger.LogDebug("RMQMessagingGateway: Opening channel to Rabbit MQ on {URL}",
Connection.AmpqUri.GetSanitizedUri());

Channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
Channel = await connection.CreateChannelAsync(
new CreateChannelOptions(
publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true),
cancellationToken);

//desired state configuration of the exchange
await Channel.DeclareExchangeForConnection(Connection, makeExchange, cancellationToken: cancellationToken);
}
}

private Task HandleBlocked(object sender, ConnectionBlockedEventArgs args)
private Task HandleBlockedAsync(object sender, ConnectionBlockedEventArgs args)
{
s_logger.LogWarning("RMQMessagingGateway: Subscription to {URL} blocked. Reason: {ErrorMessage}",
Connection.AmpqUri.GetSanitizedUri(), args.Reason);

return Task.CompletedTask;
}

private Task HandleUnBlocked(object sender, AsyncEventArgs args)
private Task HandleUnBlockedAsync(object sender, AsyncEventArgs args)
{
s_logger.LogInformation("RMQMessagingGateway: Subscription to {URL} unblocked",
Connection.AmpqUri.GetSanitizedUri());
Expand Down
68 changes: 33 additions & 35 deletions tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/TestHelpers.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#region Licence

/* The MIT License (MIT)
Copyright © 2014 Ian Cooper <[email protected]>
Expand All @@ -22,53 +23,50 @@ THE SOFTWARE. */

#endregion

using System;
using System.Linq;
using System.Threading.Tasks;
using Paramore.Brighter.MessagingGateway.RMQ;
using RabbitMQ.Client;

namespace Paramore.Brighter.RMQ.Tests.MessagingGateway
namespace Paramore.Brighter.RMQ.Tests.MessagingGateway;

internal class QueueFactory
{
private readonly RmqMessagingGatewayConnection _connection;
private readonly ChannelName _channelName;
private readonly RoutingKeys _routingKeys;

internal class QueueFactory
public QueueFactory(RmqMessagingGatewayConnection connection, ChannelName channelName, RoutingKeys routingKeys)
{
private readonly RmqMessagingGatewayConnection _connection;
private readonly ChannelName _channelName;
private readonly RoutingKeys _routingKeys;
_connection = connection;
_channelName = channelName;
_routingKeys = routingKeys;
}

public QueueFactory(RmqMessagingGatewayConnection connection, ChannelName channelName, RoutingKeys routingKeys)
{
_connection = connection;
_channelName = channelName;
_routingKeys = routingKeys;
}
public async Task Create()
{
var connectionFactory = new ConnectionFactory { Uri = _connection.AmpqUri.Uri };
await using var connection = await connectionFactory.CreateConnectionAsync();
await using var channel =
await connection.CreateChannelAsync(new CreateChannelOptions(
publisherConfirmationsEnabled: true,
publisherConfirmationTrackingEnabled: true));

public void Create(TimeSpan timeToDelayForCreation)
await channel.DeclareExchangeForConnection(_connection, OnMissingChannel.Create);
await channel.QueueDeclareAsync(_channelName.Value, false, false, false, null);
if (_routingKeys.Any())
{
var connectionFactory = new ConnectionFactory {Uri = _connection.AmpqUri.Uri};
using (var connection = connectionFactory.CreateConnection())
foreach (RoutingKey routingKey in _routingKeys)
{
using (var channel = connection.CreateModel())
{
channel.DeclareExchangeForConnection(_connection, OnMissingChannel.Create);
channel.QueueDeclare(_channelName.Value, false, false, false, null);
if (_routingKeys.Any())
{
foreach (RoutingKey routingKey in _routingKeys)
channel.QueueBind(_channelName.Value, _connection.Exchange.Name, routingKey);
}
else
{
channel.QueueBind(_channelName.Value, _connection.Exchange.Name, _channelName);
}

}
await channel.QueueBindAsync(_channelName.Value, _connection.Exchange.Name, routingKey);
}

//We need to delay to actually create these queues before we send to them
Task.Delay(timeToDelayForCreation).Wait();
}
}
}
else
{
await channel.QueueBindAsync(_channelName.Value, _connection.Exchange.Name, _channelName);
}

//We need to delay to actually create these queues before we send to them
// await Task.Delay(timeToDelayForCreation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public RMQBufferedConsumerTests()
_messageConsumer = new RmqMessageConsumer(connection:rmqConnection, queueName:_channelName, routingKey:_routingKey, isDurable:false, highAvailability:false, batchSize:BatchSize);

//create the queue, so that we can receive messages posted to it
new QueueFactory(rmqConnection, _channelName, new RoutingKeys([_routingKey])).Create(TimeSpan.FromMilliseconds(3000));
new QueueFactory(rmqConnection, _channelName, new RoutingKeys(_routingKey)).Create().Wait();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public RmqMessageConsumerMultipleTopicTests()
_messageProducer = new RmqMessageProducer(rmqConnection);
_messageConsumer = new RmqMessageConsumer(rmqConnection, queueName , topics, false, false);

new QueueFactory(rmqConnection, queueName, topics).Create(TimeSpan.FromMilliseconds(3000));
new QueueFactory(rmqConnection, queueName, topics).Create().Wait();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public RmqMessageProducerConfirmationsSendMessageTests ()

//we need a queue to avoid a discard
new QueueFactory(rmqConnection, new ChannelName(Guid.NewGuid().ToString()), new RoutingKeys(_message.Header.Topic))
.Create(TimeSpan.FromMilliseconds(3000));
.Create()
.Wait();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ public RmqMessageProducerConfirmationsSendMessageAsyncTests()

//we need a queue to avoid a discard
new QueueFactory(rmqConnection, new ChannelName(Guid.NewGuid().ToString()), new RoutingKeys(_message.Header.Topic))
.Create(TimeSpan.FromMilliseconds(3000));
.Create()
.Wait();
}

[Fact]
public async Task When_confirming_posting_a_message_via_the_messaging_gateway_async()
{
//The RMQ client doesn't support async, so this is async over sync, but let's check it works all the same
await _messageProducer.SendAsync(_message);

await Task.Delay(500);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public RmqAssumeExistingInfrastructureTests()

//This creates the infrastructure we want
new QueueFactory(rmqConnection, queueName, new RoutingKeys( _message.Header.Topic))
.Create(TimeSpan.FromMilliseconds(3000));
.Create()
.Wait();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public RmqValidateExistingInfrastructureTests()

//This creates the infrastructure we want
new QueueFactory(rmqConnection, queueName, new RoutingKeys(routingKey))
.Create(TimeSpan.FromMilliseconds(3000));
.Create()
.Wait();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public RmqMessageProducerSendPersistentMessageTests()
_messageConsumer = new RmqMessageConsumer(rmqConnection, queueName, _message.Header.Topic, false);

new QueueFactory(rmqConnection, queueName, new RoutingKeys( _message.Header.Topic))
.Create(TimeSpan.FromMilliseconds(3000));
.Create()
.Wait();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public RmqMessageProducerSendMessageTests()
_messageConsumer = new RmqMessageConsumer(rmqConnection, queueName, _message.Header.Topic, false);

new QueueFactory(rmqConnection, queueName, new RoutingKeys(_message.Header.Topic))
.Create(TimeSpan.FromMilliseconds(3000));
.Create()
.Wait();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public RmqMessageProducerDelayedMessageTests()
_messageConsumer = new RmqMessageConsumer(rmqConnection, queueName, routingKey, false);

new QueueFactory(rmqConnection, queueName, new RoutingKeys([routingKey]))
.Create(TimeSpan.FromMilliseconds(3000));
.Create()
.Wait();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ THE SOFTWARE. */
using System;
using Paramore.Brighter.MessagingGateway.RMQ;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;

namespace Paramore.Brighter.RMQ.Tests.TestDoubles
Expand Down

0 comments on commit 407091e

Please sign in to comment.