diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageGateway.cs b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageGateway.cs index bedd662440..46598841b0 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageGateway.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ/RmqMessageGateway.cs @@ -122,29 +122,35 @@ private void ConnectWithRetry(ChannelName queueName, OnMissingChannel makeExchan new Dictionary { { "queueName", queueName.Value } }); } - protected virtual async Task ConnectToBroker(OnMissingChannel makeExchange, + protected virtual void ConnectToBroker(OnMissingChannel makeExchange) => + ConnectToBrokerAsync(makeExchange).GetAwaiter().GetResult(); + + protected virtual async Task ConnectToBrokerAsync(OnMissingChannel makeExchange, CancellationToken cancellationToken = default) { if (Channel == null || Channel.IsClosed) { - var connection = - new RmqMessageGatewayConnectionPool(Connection.Name, Connection.Heartbeat).GetConnection( - _connectionFactory); + var connection = await new RmqMessageGatewayConnectionPool(Connection.Name, Connection.Heartbeat) + .GetConnectionAsync(_connectionFactory, cancellationToken); - connection.ConnectionBlockedAsync += HandleBlocked; - connection.ConnectionUnblockedAsync += HandleUnBlocked; + connection.ConnectionBlockedAsync += HandleBlockedAsync; + connection.ConnectionUnblockedAsync += HandleUnBlockedAsync; s_logger.LogDebug("RMQMessagingGateway: Opening channel to Rabbit MQ on {URL}", Connection.AmpqUri.GetSanitizedUri()); - Channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); + Channel = await connection.CreateChannelAsync( + new CreateChannelOptions( + publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true), + cancellationToken); //desired state configuration of the exchange await Channel.DeclareExchangeForConnection(Connection, makeExchange, cancellationToken: cancellationToken); } } - private Task HandleBlocked(object sender, ConnectionBlockedEventArgs args) + private Task HandleBlockedAsync(object sender, ConnectionBlockedEventArgs args) { s_logger.LogWarning("RMQMessagingGateway: Subscription to {URL} blocked. Reason: {ErrorMessage}", Connection.AmpqUri.GetSanitizedUri(), args.Reason); @@ -152,7 +158,7 @@ private Task HandleBlocked(object sender, ConnectionBlockedEventArgs args) return Task.CompletedTask; } - private Task HandleUnBlocked(object sender, AsyncEventArgs args) + private Task HandleUnBlockedAsync(object sender, AsyncEventArgs args) { s_logger.LogInformation("RMQMessagingGateway: Subscription to {URL} unblocked", Connection.AmpqUri.GetSanitizedUri()); diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/TestHelpers.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/TestHelpers.cs index a0670ed0d1..fae2a19134 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/TestHelpers.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/TestHelpers.cs @@ -1,4 +1,5 @@ #region Licence + /* The MIT License (MIT) Copyright © 2014 Ian Cooper @@ -22,53 +23,50 @@ THE SOFTWARE. */ #endregion -using System; using System.Linq; using System.Threading.Tasks; using Paramore.Brighter.MessagingGateway.RMQ; using RabbitMQ.Client; -namespace Paramore.Brighter.RMQ.Tests.MessagingGateway +namespace Paramore.Brighter.RMQ.Tests.MessagingGateway; + +internal class QueueFactory { + private readonly RmqMessagingGatewayConnection _connection; + private readonly ChannelName _channelName; + private readonly RoutingKeys _routingKeys; - internal class QueueFactory + public QueueFactory(RmqMessagingGatewayConnection connection, ChannelName channelName, RoutingKeys routingKeys) { - private readonly RmqMessagingGatewayConnection _connection; - private readonly ChannelName _channelName; - private readonly RoutingKeys _routingKeys; + _connection = connection; + _channelName = channelName; + _routingKeys = routingKeys; + } - public QueueFactory(RmqMessagingGatewayConnection connection, ChannelName channelName, RoutingKeys routingKeys) - { - _connection = connection; - _channelName = channelName; - _routingKeys = routingKeys; - } + public async Task Create() + { + var connectionFactory = new ConnectionFactory { Uri = _connection.AmpqUri.Uri }; + await using var connection = await connectionFactory.CreateConnectionAsync(); + await using var channel = + await connection.CreateChannelAsync(new CreateChannelOptions( + publisherConfirmationsEnabled: true, + publisherConfirmationTrackingEnabled: true)); - public void Create(TimeSpan timeToDelayForCreation) + await channel.DeclareExchangeForConnection(_connection, OnMissingChannel.Create); + await channel.QueueDeclareAsync(_channelName.Value, false, false, false, null); + if (_routingKeys.Any()) { - var connectionFactory = new ConnectionFactory {Uri = _connection.AmpqUri.Uri}; - using (var connection = connectionFactory.CreateConnection()) + foreach (RoutingKey routingKey in _routingKeys) { - using (var channel = connection.CreateModel()) - { - channel.DeclareExchangeForConnection(_connection, OnMissingChannel.Create); - channel.QueueDeclare(_channelName.Value, false, false, false, null); - if (_routingKeys.Any()) - { - foreach (RoutingKey routingKey in _routingKeys) - channel.QueueBind(_channelName.Value, _connection.Exchange.Name, routingKey); - } - else - { - channel.QueueBind(_channelName.Value, _connection.Exchange.Name, _channelName); - } - - } + await channel.QueueBindAsync(_channelName.Value, _connection.Exchange.Name, routingKey); } - - //We need to delay to actually create these queues before we send to them - Task.Delay(timeToDelayForCreation).Wait(); } - } -} + else + { + await channel.QueueBindAsync(_channelName.Value, _connection.Exchange.Name, _channelName); + } + //We need to delay to actually create these queues before we send to them + // await Task.Delay(timeToDelayForCreation); + } +} diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_a_message_consumer_reads_multiple_messages.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_a_message_consumer_reads_multiple_messages.cs index b93712842b..2371035877 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_a_message_consumer_reads_multiple_messages.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_a_message_consumer_reads_multiple_messages.cs @@ -27,7 +27,7 @@ public RMQBufferedConsumerTests() _messageConsumer = new RmqMessageConsumer(connection:rmqConnection, queueName:_channelName, routingKey:_routingKey, isDurable:false, highAvailability:false, batchSize:BatchSize); //create the queue, so that we can receive messages posted to it - new QueueFactory(rmqConnection, _channelName, new RoutingKeys([_routingKey])).Create(TimeSpan.FromMilliseconds(3000)); + new QueueFactory(rmqConnection, _channelName, new RoutingKeys(_routingKey)).Create().Wait(); } [Fact] diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_binding_a_channel_to_multiple_topics.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_binding_a_channel_to_multiple_topics.cs index f2413f9f65..a1d16c4d05 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_binding_a_channel_to_multiple_topics.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_binding_a_channel_to_multiple_topics.cs @@ -39,7 +39,7 @@ public RmqMessageConsumerMultipleTopicTests() _messageProducer = new RmqMessageProducer(rmqConnection); _messageConsumer = new RmqMessageConsumer(rmqConnection, queueName , topics, false, false); - new QueueFactory(rmqConnection, queueName, topics).Create(TimeSpan.FromMilliseconds(3000)); + new QueueFactory(rmqConnection, queueName, topics).Create().Wait(); } [Fact] diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway.cs index 4d94ff0cf7..b827fb4fbb 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway.cs @@ -68,7 +68,8 @@ public RmqMessageProducerConfirmationsSendMessageTests () //we need a queue to avoid a discard new QueueFactory(rmqConnection, new ChannelName(Guid.NewGuid().ToString()), new RoutingKeys(_message.Header.Topic)) - .Create(TimeSpan.FromMilliseconds(3000)); + .Create() + .Wait(); } [Fact] diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway_async.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway_async.cs index afcb8f864c..81ff5e0abe 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway_async.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_confirming_posting_a_message_via_the_messaging_gateway_async.cs @@ -68,13 +68,13 @@ public RmqMessageProducerConfirmationsSendMessageAsyncTests() //we need a queue to avoid a discard new QueueFactory(rmqConnection, new ChannelName(Guid.NewGuid().ToString()), new RoutingKeys(_message.Header.Topic)) - .Create(TimeSpan.FromMilliseconds(3000)); + .Create() + .Wait(); } [Fact] public async Task When_confirming_posting_a_message_via_the_messaging_gateway_async() { - //The RMQ client doesn't support async, so this is async over sync, but let's check it works all the same await _messageProducer.SendAsync(_message); await Task.Delay(500); diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_infrastructure_exists_can_assert.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_infrastructure_exists_can_assert.cs index ed006158fe..accdc92e45 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_infrastructure_exists_can_assert.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_infrastructure_exists_can_assert.cs @@ -37,7 +37,8 @@ public RmqAssumeExistingInfrastructureTests() //This creates the infrastructure we want new QueueFactory(rmqConnection, queueName, new RoutingKeys( _message.Header.Topic)) - .Create(TimeSpan.FromMilliseconds(3000)); + .Create() + .Wait(); } [Fact] diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_infrastructure_exists_can_validate.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_infrastructure_exists_can_validate.cs index a52300b157..eb7374d961 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_infrastructure_exists_can_validate.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_infrastructure_exists_can_validate.cs @@ -37,7 +37,8 @@ public RmqValidateExistingInfrastructureTests() //This creates the infrastructure we want new QueueFactory(rmqConnection, queueName, new RoutingKeys(routingKey)) - .Create(TimeSpan.FromMilliseconds(3000)); + .Create() + .Wait(); } [Fact] diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_posting_a_message_to_persist_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_posting_a_message_to_persist_via_the_messaging_gateway.cs index 5a6643d1ba..1ea260868e 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_posting_a_message_to_persist_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_posting_a_message_to_persist_via_the_messaging_gateway.cs @@ -33,7 +33,8 @@ public RmqMessageProducerSendPersistentMessageTests() _messageConsumer = new RmqMessageConsumer(rmqConnection, queueName, _message.Header.Topic, false); new QueueFactory(rmqConnection, queueName, new RoutingKeys( _message.Header.Topic)) - .Create(TimeSpan.FromMilliseconds(3000)); + .Create() + .Wait(); } [Fact] diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs index 691fce033e..73a6476111 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_posting_a_message_via_the_messaging_gateway.cs @@ -56,7 +56,8 @@ public RmqMessageProducerSendMessageTests() _messageConsumer = new RmqMessageConsumer(rmqConnection, queueName, _message.Header.Topic, false); new QueueFactory(rmqConnection, queueName, new RoutingKeys(_message.Header.Topic)) - .Create(TimeSpan.FromMilliseconds(3000)); + .Create() + .Wait(); } [Fact] diff --git a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_reading_a_delayed_message_via_the_messaging_gateway.cs b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_reading_a_delayed_message_via_the_messaging_gateway.cs index d18a0fb8fd..52288eecdf 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_reading_a_delayed_message_via_the_messaging_gateway.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/MessagingGateway/When_reading_a_delayed_message_via_the_messaging_gateway.cs @@ -61,7 +61,8 @@ public RmqMessageProducerDelayedMessageTests() _messageConsumer = new RmqMessageConsumer(rmqConnection, queueName, routingKey, false); new QueueFactory(rmqConnection, queueName, new RoutingKeys([routingKey])) - .Create(TimeSpan.FromMilliseconds(3000)); + .Create() + .Wait(); } [Fact] diff --git a/tests/Paramore.Brighter.RMQ.Tests/TestDoubles/TestDoubleRmqMessageConsumer.cs b/tests/Paramore.Brighter.RMQ.Tests/TestDoubles/TestDoubleRmqMessageConsumer.cs index ddc3c49ffd..abdaebd4a2 100644 --- a/tests/Paramore.Brighter.RMQ.Tests/TestDoubles/TestDoubleRmqMessageConsumer.cs +++ b/tests/Paramore.Brighter.RMQ.Tests/TestDoubles/TestDoubleRmqMessageConsumer.cs @@ -25,6 +25,7 @@ THE SOFTWARE. */ using System; using Paramore.Brighter.MessagingGateway.RMQ; using RabbitMQ.Client; +using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; namespace Paramore.Brighter.RMQ.Tests.TestDoubles