Skip to content

Commit

Permalink
Handle Null message Bodies in the outbox #2235 (#2316)
Browse files Browse the repository at this point in the history
  • Loading branch information
preardon authored Oct 16, 2022
1 parent ac5b41c commit db8e6b1
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 17 deletions.
12 changes: 12 additions & 0 deletions samples/OpenTelemetry/Consumer/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"profiles": {
"Consumer": {
"commandName": "Project",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:52053;http://localhost:52054"
}
}
}
12 changes: 12 additions & 0 deletions samples/OpenTelemetry/Sweeper/Properties/launchSettings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"profiles": {
"Sweeper": {
"commandName": "Project",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
},
"applicationUrl": "https://localhost:52051;http://localhost:52052"
}
}
}
10 changes: 7 additions & 3 deletions src/Paramore.Brighter.Outbox.MsSql/MsSqlOutbox.cs
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ private SqlParameter[] InitAddDbParameters(Message message, int? position = null
CreateSqlParameter($"{prefix}ReplyTo", message.Header.ReplyTo),
CreateSqlParameter($"{prefix}ContentType", message.Header.ContentType),
CreateSqlParameter($"{prefix}HeaderBag", bagJson),
CreateSqlParameter($"{prefix}Body", message.Body.Value)
CreateSqlParameter($"{prefix}Body", message.Body?.Value)
};
return parameters;
}
Expand Down Expand Up @@ -810,8 +810,12 @@ private Message MapAMessage(SqlDataReader dr)
}
}
}

var body = new MessageBody(dr.GetString(dr.GetOrdinal("Body")));

var bodyOrdinal = dr.GetOrdinal("Body");
string messageBody = string.Empty;
if(!dr.IsDBNull(bodyOrdinal))
messageBody = dr.GetString(bodyOrdinal);
var body = new MessageBody(messageBody);

return new Message(header, body);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#region Licence
#region Licence

/* The MIT License (MIT)
Copyright © 2014 Francesco Pighi <[email protected]>
Expand Down Expand Up @@ -38,7 +38,7 @@ public class SqlOutboxWritingMessageTests : IDisposable
private readonly string _key3 = "name3";
private readonly string _key4 = "name4";
private readonly string _key5 = "name5";
private readonly Message _message;
private Message _message;
private readonly MsSqlOutbox _sqlOutbox;
private Message _storedMessage;
private readonly string _value1 = "value1";
Expand All @@ -47,14 +47,15 @@ public class SqlOutboxWritingMessageTests : IDisposable
private readonly Guid _value4 = Guid.NewGuid();
private readonly DateTime _value5 = DateTime.UtcNow;
private readonly MsSqlTestHelper _msSqlTestHelper;
private readonly MessageHeader _messageHeader;

public SqlOutboxWritingMessageTests()
{
_msSqlTestHelper = new MsSqlTestHelper();
_msSqlTestHelper.SetupMessageDb();

_sqlOutbox = new MsSqlOutbox(_msSqlTestHelper.OutboxConfiguration);
var messageHeader = new MessageHeader(
_messageHeader = new MessageHeader(
messageId:Guid.NewGuid(),
topic: "test_topic",
messageType: MessageType.MT_DOCUMENT,
Expand All @@ -64,23 +65,40 @@ public SqlOutboxWritingMessageTests()
correlationId: Guid.NewGuid(),
replyTo: "ReplyAddress",
contentType: "text/plain");
messageHeader.Bag.Add(_key1, _value1);
messageHeader.Bag.Add(_key2, _value2);
messageHeader.Bag.Add(_key3, _value3);
messageHeader.Bag.Add(_key4, _value4);
messageHeader.Bag.Add(_key5, _value5);
_messageHeader.Bag.Add(_key1, _value1);
_messageHeader.Bag.Add(_key2, _value2);
_messageHeader.Bag.Add(_key3, _value3);
_messageHeader.Bag.Add(_key4, _value4);
_messageHeader.Bag.Add(_key5, _value5);
}

_message = new Message(messageHeader, new MessageBody("message body"));
[Fact]
public void When_Writing_A_Message_To_The_MSSQL_Outbox()
{
_message = new Message(_messageHeader, new MessageBody("message body"));
_sqlOutbox.Add(_message);

AssertMessage();
}

[Fact]
public void When_Writing_A_Message_To_The_MSSQL_Outbox()
public void When_Writing_A_Message_With_a_Null_To_The_MSSQL_Outbox()
{
_message = new Message(_messageHeader, null);
_sqlOutbox.Add(_message);

AssertMessage();
}

private void AssertMessage()
{
_storedMessage = _sqlOutbox.Get(_message.Id);

//should read the message from the sql outbox
_storedMessage.Body.Value.Should().Be(_message.Body.Value);
if (!string.IsNullOrEmpty(_storedMessage.Body.Value))
_storedMessage.Body.Value.Should().Be(_message.Body.Value);
else
Assert.Null(_message.Body);
//should read the header from the sql outbox
_storedMessage.Header.Topic.Should().Be(_message.Header.Topic);
_storedMessage.Header.MessageType.Should().Be(_message.Header.MessageType);
Expand All @@ -90,8 +108,8 @@ public void When_Writing_A_Message_To_The_MSSQL_Outbox()
_storedMessage.Header.CorrelationId.Should().Be(_message.Header.CorrelationId);
_storedMessage.Header.ReplyTo.Should().Be(_message.Header.ReplyTo);
_storedMessage.Header.ContentType.Should().Be(_message.Header.ContentType);


//Bag serialization
_storedMessage.Header.Bag.ContainsKey(_key1).Should().BeTrue();
_storedMessage.Header.Bag[_key1].Should().Be(_value1);
Expand All @@ -103,7 +121,7 @@ public void When_Writing_A_Message_To_The_MSSQL_Outbox()
_storedMessage.Header.Bag[_key4].Should().Be(_value4);
_storedMessage.Header.Bag.ContainsKey(_key5).Should().BeTrue();
_storedMessage.Header.Bag[_key5].Should().Be(_value5);
}
}

public void Dispose()
{
Expand Down

0 comments on commit db8e6b1

Please sign in to comment.