From 7ad9585eef84030c0941588cb242b3141b8c8fcd Mon Sep 17 00:00:00 2001 From: Rafael Lillo Date: Mon, 20 Jan 2025 23:17:17 +0000 Subject: [PATCH] Fixes DefaultMapper & SNS/SQS Publish (#3488) * Fixes sample & JsonMapper * Move to ConcurrentDictionary --- .../GreetingsReceiverConsole/Program.cs | 7 ++- .../AWSTaskQueue/GreetingsSender/Program.cs | 12 +++++- .../SnsMessagePublisher.cs | 12 +++++- .../SqsMessageSender.cs | 8 +++- .../MessageMapperRegistry.cs | 43 ++++++++++++------- .../MessageMappers/JsonMessageMapper.cs | 4 +- 6 files changed, 62 insertions(+), 24 deletions(-) diff --git a/samples/TaskQueue/AWSTaskQueue/GreetingsReceiverConsole/Program.cs b/samples/TaskQueue/AWSTaskQueue/GreetingsReceiverConsole/Program.cs index b694dc12e6..7db2ddfc8c 100644 --- a/samples/TaskQueue/AWSTaskQueue/GreetingsReceiverConsole/Program.cs +++ b/samples/TaskQueue/AWSTaskQueue/GreetingsReceiverConsole/Program.cs @@ -66,7 +66,12 @@ public static async Task Main(string[] args) new RoutingKey(typeof(FarewellEvent).FullName.ToValidSNSTopicName(true)), bufferSize: 10, timeOut: TimeSpan.FromMilliseconds(20), - lockTimeout: 30) + lockTimeout: 30, + sqsType: SnsSqsType.Fifo, + snsAttributes: new SnsAttributes + { + Type = SnsSqsType.Fifo + }) }; //create the gateway diff --git a/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs b/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs index 632152f373..23da574b27 100644 --- a/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs +++ b/samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs @@ -24,6 +24,7 @@ THE SOFTWARE. */ #endregion using System; +using System.Threading.Tasks; using System.Transactions; using Amazon; using Amazon.Runtime.CredentialManagement; @@ -40,7 +41,7 @@ namespace GreetingsSender { class Program { - static void Main(string[] args) + static async Task Main(string[] args) { Log.Logger = new LoggerConfiguration() .MinimumLevel.Debug() @@ -71,6 +72,15 @@ static void Main(string[] args) { Topic = new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()), RequestType = typeof(GreetingEvent) + }, + new() + { + Topic = new RoutingKey(typeof(FarewellEvent).FullName.ToValidSNSTopicName(true)), + RequestType = typeof(FarewellEvent), + SnsAttributes = new SnsAttributes + { + Type = SnsSqsType.Fifo + } } } ).Create(); diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs index caf9bebd0d..8bda62e18f 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SnsMessagePublisher.cs @@ -56,8 +56,6 @@ public SnsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien new() { StringValue = Convert.ToString(message.Header.MessageId), DataType = "String" }, [HeaderNames.Topic] = new() { StringValue = _topicArn, DataType = "String" }, [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = "String" }, - [HeaderNames.CorrelationId] = - new() { StringValue = Convert.ToString(message.Header.CorrelationId), DataType = "String" }, [HeaderNames.HandledCount] = new() { StringValue = Convert.ToString(message.Header.HandledCount), DataType = "String" }, [HeaderNames.MessageType] = @@ -67,6 +65,16 @@ public SnsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien StringValue = Convert.ToString(message.Header.TimeStamp), DataType = "String" } }; + + if (!string.IsNullOrEmpty(message.Header.CorrelationId)) + { + messageAttributes[HeaderNames.CorrelationId] = new MessageAttributeValue + { + StringValue = Convert.ToString(message.Header.CorrelationId), + DataType = "String" + }; + } + if (_snsSqsType == SnsSqsType.Fifo) { diff --git a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs index 028d2b17e3..b508bae048 100644 --- a/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs +++ b/src/Paramore.Brighter.MessagingGateway.AWSSQS/SqsMessageSender.cs @@ -79,8 +79,6 @@ public SqsMessageSender(string queueUrl, SnsSqsType queueType, AmazonSQSClient c new() { StringValue = message.Header.MessageId, DataType = "String" }, [HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = "String" }, [HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = "String" }, - [HeaderNames.CorrelationId] = - new() { StringValue = message.Header.CorrelationId, DataType = "String" }, [HeaderNames.HandledCount] = new() { StringValue = Convert.ToString(message.Header.HandledCount), DataType = "String" }, [HeaderNames.MessageType] = @@ -102,6 +100,12 @@ public SqsMessageSender(string queueUrl, SnsSqsType queueType, AmazonSQSClient c messageAttributes.Add(HeaderNames.Subject, new MessageAttributeValue { StringValue = message.Header.Subject, DataType = "String" }); } + + if (!string.IsNullOrEmpty(message.Header.CorrelationId)) + { + messageAttributes.Add(HeaderNames.CorrelationId, + new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = "String" }); + } // we can set up to 10 attributes; we have set 6 above, so use a single JSON object as the bag var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options); diff --git a/src/Paramore.Brighter/MessageMapperRegistry.cs b/src/Paramore.Brighter/MessageMapperRegistry.cs index 5bf294e790..fad3b5e0e6 100644 --- a/src/Paramore.Brighter/MessageMapperRegistry.cs +++ b/src/Paramore.Brighter/MessageMapperRegistry.cs @@ -23,6 +23,7 @@ THE SOFTWARE. */ #endregion using System; +using System.Collections.Concurrent; using System.Collections.Generic; using Paramore.Brighter.MessageMappers; @@ -39,8 +40,8 @@ public class MessageMapperRegistry : IAmAMessageMapperRegistry, IAmAMessageMappe { private readonly IAmAMessageMapperFactory? _messageMapperFactory; private readonly IAmAMessageMapperFactoryAsync? _messageMapperFactoryAsync; - private readonly Dictionary _messageMappers = new Dictionary(); - private readonly Dictionary _asyncMessageMappers = new Dictionary(); + private readonly ConcurrentDictionary _messageMappers = new(); + private readonly ConcurrentDictionary _asyncMessageMappers = new(); private readonly Type? _defaultMessageMapper; private readonly Type? _defaultMessageMapperAsync; @@ -74,11 +75,16 @@ public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory, if (_messageMapperFactory is null) return null; - var messageMapperType = _messageMappers.ContainsKey(typeof(TRequest)) - ? _messageMappers[typeof(TRequest)] - : _defaultMessageMapper; + if (!_messageMappers.TryGetValue(typeof(TRequest), out var messageMapperType) && _defaultMessageMapper != null) + { + messageMapperType = _defaultMessageMapper.MakeGenericType(typeof(TRequest)); + _messageMappers.TryAdd(typeof(TRequest), messageMapperType); + } - if (messageMapperType is null) return null; + if (messageMapperType is null) + { + return null; + } return (IAmAMessageMapper)_messageMapperFactory.Create(messageMapperType); } @@ -92,12 +98,17 @@ public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory, { if (_messageMapperFactoryAsync is null) return null; - - var messageMapperType = _asyncMessageMappers.ContainsKey(typeof(TRequest)) - ? _asyncMessageMappers[typeof(TRequest)] - : _defaultMessageMapperAsync; - - if (messageMapperType is null) return null; + + if (!_asyncMessageMappers.TryGetValue(typeof(TRequest), out var messageMapperType) && _defaultMessageMapperAsync != null) + { + messageMapperType = _defaultMessageMapperAsync.MakeGenericType(typeof(TRequest)); + _asyncMessageMappers.TryAdd(typeof(TRequest), messageMapperType); + } + + if (messageMapperType is null) + { + return null; + } return (IAmAMessageMapperAsync)_messageMapperFactoryAsync.Create(messageMapperType); } @@ -113,7 +124,7 @@ public void Register() where TRequest : class, IReques if (_messageMappers.ContainsKey(typeof(TRequest))) throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", typeof(TRequest).Name)); - _messageMappers.Add(typeof(TRequest), typeof(TMessageMapper)); + _messageMappers.TryAdd(typeof(TRequest), typeof(TMessageMapper)); } /// @@ -127,7 +138,7 @@ public void Register(Type request, Type mapper) if (_messageMappers.ContainsKey(request)) throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", request.Name)); - _messageMappers.Add(request, mapper); + _messageMappers.TryAdd(request, mapper); } /// @@ -141,7 +152,7 @@ public void RegisterAsync() where TRequest : class, IR if (_asyncMessageMappers.ContainsKey(typeof(TRequest))) throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", typeof(TRequest).Name)); - _asyncMessageMappers.Add(typeof(TRequest), typeof(TMessageMapper)); + _asyncMessageMappers.TryAdd(typeof(TRequest), typeof(TMessageMapper)); } @@ -156,7 +167,7 @@ public void RegisterAsync(Type request, Type mapper) if (_asyncMessageMappers.ContainsKey(request)) throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", request.Name)); - _asyncMessageMappers.Add(request, mapper); + _asyncMessageMappers.TryAdd(request, mapper); } } } diff --git a/src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs b/src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs index 14292db370..39699d8503 100644 --- a/src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs +++ b/src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs @@ -5,9 +5,9 @@ namespace Paramore.Brighter.MessageMappers; -public class JsonMessageMapper(RequestContext? context) : IAmAMessageMapper, IAmAMessageMapperAsync where TRequest : class, IRequest +public class JsonMessageMapper : IAmAMessageMapper, IAmAMessageMapperAsync where TRequest : class, IRequest { - public IRequestContext? Context { get; set; } = context; + public IRequestContext? Context { get; set; } public Task MapToMessageAsync(TRequest request, Publication publication, CancellationToken cancellationToken = default)