Skip to content

Commit

Permalink
(#255) Move serialization into exception handled execution
Browse files Browse the repository at this point in the history
So that if there is a mismatch in serializer between clients
they will post to error exchange or propegate back to requester
  • Loading branch information
pardahlman committed Aug 19, 2017
1 parent c66be0c commit 15c5b52
Show file tree
Hide file tree
Showing 13 changed files with 214 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ public static class RespondMessageContextExtension
Pipe = RespondExtension.ConsumePipe + (consume => consume
.Replace<RespondExceptionMiddleware, RespondExceptionMiddleware>(args: new RespondExceptionOptions
{
InnerPipe = p => p.Use<RespondInvokationMiddleware>(new HandlerInvokationOptions
{
HandlerArgsFunc = context => new[] { context.GetMessage(), context.GetMessageContext() }
})
InnerPipe = p => p
.Use<BodyDeserializationMiddleware>(new MessageDeserializationOptions
{
BodyTypeFunc = context => context.GetRequestMessageType()
})
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(RespondStage.MessageDeserialized))
.Use<RespondInvokationMiddleware>(new HandlerInvokationOptions
{
HandlerArgsFunc = context => new[] { context.GetMessage(), context.GetMessageContext() }
})
})
.Use<HeaderDeserializationMiddleware>(new HeaderDeserializationOptions
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,20 @@ public static class SubscribeMessageContextExtension
HeaderTypeFunc = c => c.GetMessageContextType(),
ContextSaveAction = (pipeCtx, msgCtx) => pipeCtx.Properties.TryAdd(PipeKey.MessageContext, msgCtx)
})
.Use<BodyDeserializationMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.MessageDeserialized))
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(MessageContextSubscibeStage.MessageContextDeserialized))
.Use<SubscriptionExceptionMiddleware>(new SubscriptionExceptionOptions
{
InnerPipe = p => p.Use<HandlerInvokationMiddleware>(new HandlerInvokationOptions
{
HandlerArgsFunc = context => new[]
InnerPipe = p => p
.Use<BodyDeserializationMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.MessageDeserialized))
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(MessageContextSubscibeStage.MessageContextDeserialized))
.Use<HandlerInvokationMiddleware>(new HandlerInvokationOptions
{
context.GetMessage(),
context.GetMessageContextResolver()?.Invoke(context) ?? context.GetMessageContext()
}
})
HandlerArgsFunc = context => new[]
{
context.GetMessage(),
context.GetMessageContextResolver()?.Invoke(context) ?? context.GetMessageContext()
}
})
})
.Use<ExplicitAckMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(MessageContextSubscibeStage.HandlerInvoked));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ public static class MessageAndContextTriggerExtension
.Replace<SubscriptionExceptionMiddleware, SubscriptionExceptionMiddleware>(args: new SubscriptionExceptionOptions
{
InnerPipe = inner => inner
.Use<BodyDeserializationMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.MessageDeserialized))
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(MessageContextSubscibeStage.MessageContextDeserialized))
.Use<ModelIdMiddleware>()
.Use<GlobalLockMiddleware>()
.Use<RetrieveStateMachineMiddleware>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
using System;
using RabbitMQ.Client;
using RawRabbit.Configuration.Consume;
using RawRabbit.Configuration.Queue;
using RawRabbit.Operations.Request.Core;
using RawRabbit.Pipe;
using RawRabbit.Pipe.Middleware;
using RawRabbit.Serialization;

namespace RawRabbit.Operations.Request.Middleware
{
public class BasicPropertiesMiddleware : Pipe.Middleware.BasicPropertiesMiddleware
{
public BasicPropertiesMiddleware(BasicPropertiesOptions options) :base(options)
public BasicPropertiesMiddleware(ISerializer serializer, BasicPropertiesOptions options) :base(serializer, options)
{ }

protected override void ModifyBasicProperties(IPipeContext context, IBasicProperties props)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class ResponderExceptionMiddleware : Pipe.Middleware.Middleware
protected Func<ExceptionInformation, IPipeContext, Task> HandlerFunc;
private readonly ILog _logger = LogProvider.For<ResponderExceptionMiddleware>();
protected Func<IPipeContext, Type> ResponseTypeFunc;
private Func<IPipeContext, BasicDeliverEventArgs> _deliveryArgFunc;
private readonly Func<IPipeContext, BasicDeliverEventArgs> _deliveryArgFunc;

public ResponderExceptionMiddleware(ISerializer serializer, ResponderExceptionOptions options = null)
{
Expand Down
13 changes: 7 additions & 6 deletions src/RawRabbit.Operations.Respond/RespondExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@ public static class RespondExtension
{
public static readonly Action<IPipeBuilder> ConsumePipe = pipe => pipe
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.MessageRecieved))
.Use<BodyDeserializationMiddleware>(new MessageDeserializationOptions
{
BodyTypeFunc = context => context.GetRequestMessageType()
})
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(RespondStage.MessageDeserialized))
.Use<RespondExceptionMiddleware>(new RespondExceptionOptions { InnerPipe = p => p.Use<RespondInvokationMiddleware>() })
.Use<RespondExceptionMiddleware>(new RespondExceptionOptions { InnerPipe = p => p
.Use<BodyDeserializationMiddleware>(new MessageDeserializationOptions
{
BodyTypeFunc = context => context.GetRequestMessageType()
})
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(RespondStage.MessageDeserialized))
.Use<RespondInvokationMiddleware>() })
.Use<ExplicitAckMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(RespondStage.HandlerInvoked))
.Use<BasicPropertiesMiddleware>(new BasicPropertiesOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public static class TriggerFromMessageExtension
.Replace<SubscriptionExceptionMiddleware, SubscriptionExceptionMiddleware>(args: new SubscriptionExceptionOptions
{
InnerPipe = inner => inner
.Use<BodyDeserializationMiddleware>()
.Use<ModelIdMiddleware>()
.Use<GlobalLockMiddleware>()
.Use<RetrieveStateMachineMiddleware>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Threading;
using System.Threading.Tasks;
using RawRabbit.Common;
using RawRabbit.Configuration.Consumer;
using RawRabbit.Operations.Subscribe.Middleware;
using RawRabbit.Operations.Subscribe.Stages;
using RawRabbit.Pipe;
Expand All @@ -14,9 +13,10 @@ public static class SubscribeMessageExtension
{
public static readonly Action<IPipeBuilder> ConsumePipe = pipe => pipe
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.MessageRecieved))
.Use<BodyDeserializationMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.MessageDeserialized))
.Use<SubscriptionExceptionMiddleware>(new SubscriptionExceptionOptions { InnerPipe = p => p.Use<SubscribeInvokationMiddleware>()})
.Use<SubscriptionExceptionMiddleware>(new SubscriptionExceptionOptions { InnerPipe = p => p
.Use<BodyDeserializationMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.MessageDeserialized))
.Use<SubscribeInvokationMiddleware>()})
.Use<ExplicitAckMiddleware>()
.Use<StageMarkerMiddleware>(StageMarkerOptions.For(StageMarker.HandlerInvoked));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ public static class RawRabbitDependencyRegisterExtension
{
public static IDependencyRegister AddRawRabbit(this IDependencyRegister register, RawRabbitOptions options = null)
{
var clientBuilder = new ClientBuilder();
options?.Plugins?.Invoke(clientBuilder);
clientBuilder.DependencyInjection?.Invoke(register);
register.AddSingleton(clientBuilder.PipeBuilderAction);

register
.AddSingleton(options?.ClientConfiguration ?? RawRabbitConfiguration.Local)
.AddSingleton<IConnectionFactory, ConnectionFactory>(provider =>
Expand Down Expand Up @@ -84,6 +79,11 @@ public static IDependencyRegister AddRawRabbit(this IDependencyRegister register
.AddTransient<IExtendedPipeBuilder, PipeBuilder>(resolver => new PipeBuilder(resolver))
.AddSingleton<IPipeBuilderFactory>(provider => new PipeBuilderFactory(provider));

var clientBuilder = new ClientBuilder();
options?.Plugins?.Invoke(clientBuilder);
clientBuilder.DependencyInjection?.Invoke(register);
register.AddSingleton(clientBuilder.PipeBuilderAction);

options?.DependencyInjection?.Invoke(register);
return register;
}
Expand Down
8 changes: 6 additions & 2 deletions src/RawRabbit/Pipe/Middleware/BasicPropertiesMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Framing;
using RawRabbit.Serialization;

namespace RawRabbit.Pipe.Middleware
{
Expand All @@ -16,19 +17,22 @@ public class BasicPropertiesOptions

public class BasicPropertiesMiddleware : Middleware
{
protected ISerializer Serializer;
protected Func<IPipeContext, IBasicProperties> GetOrCreatePropsFunc;
protected Action<IPipeContext, IBasicProperties> PropertyModifier;
protected Action<IPipeContext, IBasicProperties> PostCreateAction;

public BasicPropertiesMiddleware(BasicPropertiesOptions options = null)
public BasicPropertiesMiddleware(ISerializer serializer, BasicPropertiesOptions options = null)
{
Serializer = serializer;
PropertyModifier = options?.PropertyModier ?? ((ctx, props) => ctx.Get<Action<IBasicProperties>>(PipeKey.BasicPropertyModifier)?.Invoke(props));
PostCreateAction = options?.PostCreateAction;
GetOrCreatePropsFunc = options?.GetOrCreatePropsFunc ?? (ctx => ctx.GetBasicProperties() ?? new BasicProperties
{
MessageId = Guid.NewGuid().ToString(),
Headers = new Dictionary<string, object>(),
Persistent = ctx.GetClientConfiguration().PersistentDeliveryMode
Persistent = ctx.GetClientConfiguration().PersistentDeliveryMode,
ContentType = Serializer.ContentType
});
}

Expand Down
26 changes: 21 additions & 5 deletions src/RawRabbit/Pipe/Middleware/BodyDeserializationMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace RawRabbit.Pipe.Middleware
public class MessageDeserializationOptions
{
public Func<IPipeContext, Type> BodyTypeFunc { get; set; }
public Func<IPipeContext, string> BodyContentTypeFunc { get; set; }
public Func<IPipeContext, byte[]> BodyFunc { get; set; }
public Action<IPipeContext, object> PersistAction { get; set; }
}
Expand All @@ -19,6 +20,7 @@ public class BodyDeserializationMiddleware : Middleware
protected readonly ISerializer Serializer;
protected Func<IPipeContext, Type> MessageTypeFunc;
protected Func<IPipeContext, byte[]> BodyBytesFunc;
protected Func<IPipeContext, string> BodyContentTypeFunc { get; set; }
protected Action<IPipeContext, object> PersistAction;
private readonly ILog _logger = LogProvider.For<BodyDeserializationMiddleware>();

Expand All @@ -28,23 +30,37 @@ public BodyDeserializationMiddleware(ISerializer serializer, MessageDeserializat
MessageTypeFunc = options?.BodyTypeFunc ?? (context => context.GetMessageType());
BodyBytesFunc = options?.BodyFunc ?? (context =>context.GetDeliveryEventArgs()?.Body);
PersistAction = options?.PersistAction ?? ((context, msg) => context.Properties.TryAdd(PipeKey.Message, msg));
BodyContentTypeFunc = options?.BodyContentTypeFunc ?? (context => context.GetDeliveryEventArgs()?.BasicProperties.ContentType);
}

public override async Task InvokeAsync(IPipeContext context, CancellationToken token)
{
var msgContentType = GetMessageContentType(context);
if (!CanSerializeMessage(msgContentType))
{
throw new SerializationException($"Registered serializer supports {Serializer.ContentType}, recieved message uses {msgContentType}.");
}
var message = GetMessage(context);
SaveInContext(context, message);
await Next.InvokeAsync(context, token);
}

protected virtual object GetMessage(IPipeContext context)
protected virtual bool CanSerializeMessage(string msgContentType)
{
var serialized = GetSerializedMessage(context);
var messageType = GetMessageType(context);
return Serializer.Deserialize(messageType, serialized);
if (string.IsNullOrEmpty(msgContentType))
{
_logger.Debug("Recieved message has no content type defined. Assuming it can be processed.");
return true;
}
return string.Equals(msgContentType, Serializer.ContentType, StringComparison.CurrentCultureIgnoreCase);
}

protected virtual string GetSerializedMessage(IPipeContext context)
protected virtual string GetMessageContentType(IPipeContext context)
{
return BodyContentTypeFunc?.Invoke(context);
}

protected virtual object GetMessage(IPipeContext context)
{
var bodyBytes = GetBodyBytes(context);
var messageType = GetMessageType(context);
Expand Down
142 changes: 142 additions & 0 deletions test/RawRabbit.IntegrationTests/Enrichers/ProtobufTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
using System;
using System.IO;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using ProtoBuf;
using RawRabbit.Exceptions;
using RawRabbit.Instantiation;
using RawRabbit.Pipe;
using Xunit;

namespace RawRabbit.IntegrationTests.Enrichers
{
public class ProtobufTests
{
[Fact]
public async Task Should_Deliver_And_Recieve_Messages_Serialized_With_Protobuf()
{
using (var client = RawRabbitFactory.CreateTestClient(new RawRabbitOptions{ Plugins = p => p.UseProtobuf() }))
{
/** Setup **/
var tcs = new TaskCompletionSource<ProtoMessage>();
var message = new ProtoMessage
{
Member = "Straight into bytes",
Id = Guid.NewGuid()
};
await client.SubscribeAsync<ProtoMessage>(msg =>
{
tcs.TrySetResult(msg);
return Task.CompletedTask;
});

/** Test **/
await client.PublishAsync(message);
await tcs.Task;

/** Assert **/
Assert.Equal(tcs.Task.Result.Id, message.Id);
Assert.Equal(tcs.Task.Result.Member, message.Member);
}
}

[Fact]
public async Task Should_Perform_Rpc_With_Messages_Serialized_With_Protobuf()
{
using (var client = RawRabbitFactory.CreateTestClient(new RawRabbitOptions {Plugins = p => p.UseProtobuf()}))
{
/* Setup */
var response = new ProtoResponse {Id = Guid.NewGuid()};
await client.RespondAsync<ProtoRequest, ProtoResponse>(request => Task.FromResult(response));

/* Test */
var recieved = await client.RequestAsync<ProtoRequest, ProtoResponse>(new ProtoRequest());

/* Assert */
Assert.Equal(recieved.Id, response.Id);
}
}

[Fact]
public async Task Should_Publish_Message_To_Error_Exchange_If_Serializer_Mismatch()
{
using (var protobufClient = RawRabbitFactory.CreateTestClient(new RawRabbitOptions { Plugins = p => p.UseProtobuf() }))
using (var jsonClient = RawRabbitFactory.CreateTestClient())
{
/** Setup **/
var handlerInvoked = false;
var tcs = new TaskCompletionSource<ProtoMessage>();
var message = new ProtoMessage
{
Member = "Straight into bytes",
Id = Guid.NewGuid()
};
await jsonClient.SubscribeAsync<ProtoMessage>(msg =>
{
handlerInvoked = true; // Should never get here
return Task.CompletedTask;
});
await protobufClient.SubscribeAsync<ProtoMessage>(msg =>
{
tcs.TrySetResult(msg);
return Task.CompletedTask;
}, ctx => ctx.UseConsumerConfiguration(cfg => cfg
.FromDeclaredQueue(q => q.WithName("error_queue"))
.OnDeclaredExchange(e => e.WithName("default_error_exchange"))
));

/** Test **/
await protobufClient.PublishAsync(message);
await tcs.Task;

/** Assert **/
Assert.False(handlerInvoked);
Assert.Equal(tcs.Task.Result.Id, message.Id);
Assert.Equal(tcs.Task.Result.Member, message.Member);
}
}

[Fact]
public async Task Should_Throw_Exception_If_Responder_Can_Not_Deserialize_Request()
{
using (var protobufClient = RawRabbitFactory.CreateTestClient(new RawRabbitOptions { Plugins = p => p.UseProtobuf() }))
using (var jsonClient = RawRabbitFactory.CreateTestClient())
{
/* Setup */
await jsonClient.RespondAsync<ProtoRequest, ProtoResponse>(request =>
Task.FromResult(new ProtoResponse())
);

/* Test */
/* Assert */
var e = await Assert.ThrowsAsync<MessageHandlerException>(() =>
protobufClient.RequestAsync<ProtoRequest, ProtoResponse>(new ProtoRequest())
);
}
}
}

[ProtoContract]
public class ProtoMessage
{
[ProtoMember(1)]
public Guid Id { get; set; }

[ProtoMember(2)]
public string Member { get; set; }
}

[ProtoContract]
public class ProtoRequest
{
[ProtoMember(1)]
public Guid Id { get; set; }
}

[ProtoContract]
public class ProtoResponse
{
[ProtoMember(1)]
public Guid Id { get; set; }
}
}
Loading

0 comments on commit 15c5b52

Please sign in to comment.