Skip to content

Commit

Permalink
Fixes DefaultMapper & SNS/SQS Publish (BrighterCommand#3488)
Browse files Browse the repository at this point in the history
* Fixes sample & JsonMapper

* Move to ConcurrentDictionary
  • Loading branch information
lillo42 authored Jan 20, 2025
1 parent db2d864 commit 7ad9585
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ public static async Task Main(string[] args)
new RoutingKey(typeof(FarewellEvent).FullName.ToValidSNSTopicName(true)),
bufferSize: 10,
timeOut: TimeSpan.FromMilliseconds(20),
lockTimeout: 30)
lockTimeout: 30,
sqsType: SnsSqsType.Fifo,
snsAttributes: new SnsAttributes
{
Type = SnsSqsType.Fifo
})
};

//create the gateway
Expand Down
12 changes: 11 additions & 1 deletion samples/TaskQueue/AWSTaskQueue/GreetingsSender/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ THE SOFTWARE. */
#endregion

using System;
using System.Threading.Tasks;
using System.Transactions;
using Amazon;
using Amazon.Runtime.CredentialManagement;
Expand All @@ -40,7 +41,7 @@ namespace GreetingsSender
{
class Program
{
static void Main(string[] args)
static async Task Main(string[] args)
{
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Debug()
Expand Down Expand Up @@ -71,6 +72,15 @@ static void Main(string[] args)
{
Topic = new RoutingKey(typeof(GreetingEvent).FullName.ToValidSNSTopicName()),
RequestType = typeof(GreetingEvent)
},
new()
{
Topic = new RoutingKey(typeof(FarewellEvent).FullName.ToValidSNSTopicName(true)),
RequestType = typeof(FarewellEvent),
SnsAttributes = new SnsAttributes
{
Type = SnsSqsType.Fifo
}
}
}
).Create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ public SnsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien
new() { StringValue = Convert.ToString(message.Header.MessageId), DataType = "String" },
[HeaderNames.Topic] = new() { StringValue = _topicArn, DataType = "String" },
[HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = "String" },
[HeaderNames.CorrelationId] =
new() { StringValue = Convert.ToString(message.Header.CorrelationId), DataType = "String" },
[HeaderNames.HandledCount] =
new() { StringValue = Convert.ToString(message.Header.HandledCount), DataType = "String" },
[HeaderNames.MessageType] =
Expand All @@ -67,6 +65,16 @@ public SnsMessagePublisher(string topicArn, AmazonSimpleNotificationServiceClien
StringValue = Convert.ToString(message.Header.TimeStamp), DataType = "String"
}
};

if (!string.IsNullOrEmpty(message.Header.CorrelationId))
{
messageAttributes[HeaderNames.CorrelationId] = new MessageAttributeValue
{
StringValue = Convert.ToString(message.Header.CorrelationId),
DataType = "String"
};
}


if (_snsSqsType == SnsSqsType.Fifo)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ public SqsMessageSender(string queueUrl, SnsSqsType queueType, AmazonSQSClient c
new() { StringValue = message.Header.MessageId, DataType = "String" },
[HeaderNames.Topic] = new() { StringValue = _queueUrl, DataType = "String" },
[HeaderNames.ContentType] = new() { StringValue = message.Header.ContentType, DataType = "String" },
[HeaderNames.CorrelationId] =
new() { StringValue = message.Header.CorrelationId, DataType = "String" },
[HeaderNames.HandledCount] =
new() { StringValue = Convert.ToString(message.Header.HandledCount), DataType = "String" },
[HeaderNames.MessageType] =
Expand All @@ -102,6 +100,12 @@ public SqsMessageSender(string queueUrl, SnsSqsType queueType, AmazonSQSClient c
messageAttributes.Add(HeaderNames.Subject,
new MessageAttributeValue { StringValue = message.Header.Subject, DataType = "String" });
}

if (!string.IsNullOrEmpty(message.Header.CorrelationId))
{
messageAttributes.Add(HeaderNames.CorrelationId,
new MessageAttributeValue { StringValue = message.Header.CorrelationId, DataType = "String" });
}

// we can set up to 10 attributes; we have set 6 above, so use a single JSON object as the bag
var bagJson = JsonSerializer.Serialize(message.Header.Bag, JsonSerialisationOptions.Options);
Expand Down
43 changes: 27 additions & 16 deletions src/Paramore.Brighter/MessageMapperRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ THE SOFTWARE. */
#endregion

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using Paramore.Brighter.MessageMappers;

Expand All @@ -39,8 +40,8 @@ public class MessageMapperRegistry : IAmAMessageMapperRegistry, IAmAMessageMappe
{
private readonly IAmAMessageMapperFactory? _messageMapperFactory;
private readonly IAmAMessageMapperFactoryAsync? _messageMapperFactoryAsync;
private readonly Dictionary<Type, Type> _messageMappers = new Dictionary<Type, Type>();
private readonly Dictionary<Type, Type> _asyncMessageMappers = new Dictionary<Type, Type>();
private readonly ConcurrentDictionary<Type, Type> _messageMappers = new();
private readonly ConcurrentDictionary<Type, Type> _asyncMessageMappers = new();
private readonly Type? _defaultMessageMapper;
private readonly Type? _defaultMessageMapperAsync;

Expand Down Expand Up @@ -74,11 +75,16 @@ public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory,
if (_messageMapperFactory is null)
return null;

var messageMapperType = _messageMappers.ContainsKey(typeof(TRequest))
? _messageMappers[typeof(TRequest)]
: _defaultMessageMapper;
if (!_messageMappers.TryGetValue(typeof(TRequest), out var messageMapperType) && _defaultMessageMapper != null)
{
messageMapperType = _defaultMessageMapper.MakeGenericType(typeof(TRequest));
_messageMappers.TryAdd(typeof(TRequest), messageMapperType);
}

if (messageMapperType is null) return null;
if (messageMapperType is null)
{
return null;
}

return (IAmAMessageMapper<TRequest>)_messageMapperFactory.Create(messageMapperType);
}
Expand All @@ -92,12 +98,17 @@ public MessageMapperRegistry(IAmAMessageMapperFactory? messageMapperFactory,
{
if (_messageMapperFactoryAsync is null)
return null;

var messageMapperType = _asyncMessageMappers.ContainsKey(typeof(TRequest))
? _asyncMessageMappers[typeof(TRequest)]
: _defaultMessageMapperAsync;

if (messageMapperType is null) return null;

if (!_asyncMessageMappers.TryGetValue(typeof(TRequest), out var messageMapperType) && _defaultMessageMapperAsync != null)
{
messageMapperType = _defaultMessageMapperAsync.MakeGenericType(typeof(TRequest));
_asyncMessageMappers.TryAdd(typeof(TRequest), messageMapperType);
}

if (messageMapperType is null)
{
return null;
}

return (IAmAMessageMapperAsync<TRequest>)_messageMapperFactoryAsync.Create(messageMapperType);
}
Expand All @@ -113,7 +124,7 @@ public void Register<TRequest, TMessageMapper>() where TRequest : class, IReques
if (_messageMappers.ContainsKey(typeof(TRequest)))
throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", typeof(TRequest).Name));

_messageMappers.Add(typeof(TRequest), typeof(TMessageMapper));
_messageMappers.TryAdd(typeof(TRequest), typeof(TMessageMapper));
}

/// <summary>
Expand All @@ -127,7 +138,7 @@ public void Register(Type request, Type mapper)
if (_messageMappers.ContainsKey(request))
throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", request.Name));

_messageMappers.Add(request, mapper);
_messageMappers.TryAdd(request, mapper);
}

/// <summary>
Expand All @@ -141,7 +152,7 @@ public void RegisterAsync<TRequest, TMessageMapper>() where TRequest : class, IR
if (_asyncMessageMappers.ContainsKey(typeof(TRequest)))
throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", typeof(TRequest).Name));

_asyncMessageMappers.Add(typeof(TRequest), typeof(TMessageMapper));
_asyncMessageMappers.TryAdd(typeof(TRequest), typeof(TMessageMapper));

}

Expand All @@ -156,7 +167,7 @@ public void RegisterAsync(Type request, Type mapper)
if (_asyncMessageMappers.ContainsKey(request))
throw new ArgumentException(string.Format("Message type {0} already has a mapper; only one mapper can be registered per type", request.Name));

_asyncMessageMappers.Add(request, mapper);
_asyncMessageMappers.TryAdd(request, mapper);
}
}
}
4 changes: 2 additions & 2 deletions src/Paramore.Brighter/MessageMappers/JsonMessageMapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

namespace Paramore.Brighter.MessageMappers;

public class JsonMessageMapper<TRequest>(RequestContext? context) : IAmAMessageMapper<TRequest>, IAmAMessageMapperAsync<TRequest> where TRequest : class, IRequest
public class JsonMessageMapper<TRequest> : IAmAMessageMapper<TRequest>, IAmAMessageMapperAsync<TRequest> where TRequest : class, IRequest
{
public IRequestContext? Context { get; set; } = context;
public IRequestContext? Context { get; set; }

public Task<Message> MapToMessageAsync(TRequest request, Publication publication,
CancellationToken cancellationToken = default)
Expand Down

0 comments on commit 7ad9585

Please sign in to comment.