From 7a8ec041139fcd57cda4acfea4344b891076832a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A4r=20Dahlman?= Date: Wed, 15 Nov 2017 21:14:28 +0100 Subject: [PATCH] Simplify ack management in pipe --- .../RespondMessageContextExtension.cs | 2 +- .../SubscribeMessageContextExtension.cs | 2 +- .../MessageAndContextTriggerExtension.cs | 2 +- .../Core/PipeContextExtensions.cs | 5 ---- .../Middleware/RespondExceptionMiddleware.cs | 2 +- .../ResponseHandlerOptionFactory.cs | 16 ++---------- .../RespondExtension.cs | 15 +++++------ .../Trigger/TriggerFromMessageExtension.cs | 2 +- .../SubscribeMessageExtension.cs | 12 ++++----- .../BasicConsumeExtension.cs | 2 +- .../Pipe/Middleware/ExplicitAckMiddleware.cs | 10 +++---- .../Middleware/HandlerInvocationMiddleware.cs | 26 +++++++------------ src/RawRabbit/Pipe/PipeContextExtension.cs | 11 +++++--- src/RawRabbit/Pipe/PipeKey.cs | 1 + .../GetOperation/BasicGetTests.cs | 1 - .../Rpc/RpcFundamentalTests.cs | 2 -- 16 files changed, 43 insertions(+), 68 deletions(-) diff --git a/src/RawRabbit.Enrichers.MessageContext.Respond/RespondMessageContextExtension.cs b/src/RawRabbit.Enrichers.MessageContext.Respond/RespondMessageContextExtension.cs index 196083a7..dada3664 100644 --- a/src/RawRabbit.Enrichers.MessageContext.Respond/RespondMessageContextExtension.cs +++ b/src/RawRabbit.Enrichers.MessageContext.Respond/RespondMessageContextExtension.cs @@ -68,7 +68,7 @@ public static Task RespondAsync { - Func genericHandler = args => (handler((TRequest) args[0], (TMessageContext) args[1]) + Func> genericHandler = args => (handler((TRequest) args[0], (TMessageContext) args[1]) .ContinueWith(tResponse => { if (tResponse.IsFaulted) diff --git a/src/RawRabbit.Enrichers.MessageContext.Subscribe/SubscribeMessageContextExtension.cs b/src/RawRabbit.Enrichers.MessageContext.Subscribe/SubscribeMessageContextExtension.cs index 25360958..f63ea3aa 100644 --- a/src/RawRabbit.Enrichers.MessageContext.Subscribe/SubscribeMessageContextExtension.cs +++ b/src/RawRabbit.Enrichers.MessageContext.Subscribe/SubscribeMessageContextExtension.cs @@ -64,7 +64,7 @@ public static Task SubscribeAsync( SubscribePipe, ctx => { - Func genericHandler = args => subscribeMethod((TMessage)args[0], (TMessageContext)args[1]); + Func> genericHandler = args => subscribeMethod((TMessage)args[0], (TMessageContext)args[1]); ctx.Properties.TryAdd(PipeKey.MessageType, typeof(TMessage)); if (!ctx.Properties.ContainsKey(PipeContextExtensions.PipebasedContextFunc)) diff --git a/src/RawRabbit.Operations.MessageSequence/Trigger/MessageAndContextTriggerExtension.cs b/src/RawRabbit.Operations.MessageSequence/Trigger/MessageAndContextTriggerExtension.cs index 11c667af..9888f4d5 100644 --- a/src/RawRabbit.Operations.MessageSequence/Trigger/MessageAndContextTriggerExtension.cs +++ b/src/RawRabbit.Operations.MessageSequence/Trigger/MessageAndContextTriggerExtension.cs @@ -45,7 +45,7 @@ public static TriggerConfigurer FromMessage consumeConfig = null ) { - Func genericHandler = args => + Func> genericHandler = args => machineFunc((TStateMachine) args[0], (TMessage) args[1], (TMessageContext)args[2]) .ContinueWith(t => new Ack()); Func genericCorrFunc = (msg, ctx) => correlationFunc((TMessage) msg, (TMessageContext)ctx); diff --git a/src/RawRabbit.Operations.Request/Core/PipeContextExtensions.cs b/src/RawRabbit.Operations.Request/Core/PipeContextExtensions.cs index 1d477146..02d97bbd 100644 --- a/src/RawRabbit.Operations.Request/Core/PipeContextExtensions.cs +++ b/src/RawRabbit.Operations.Request/Core/PipeContextExtensions.cs @@ -33,11 +33,6 @@ public static Type GetRequestMessageType(this IPipeContext context) return context.Get(RequestKey.OutgoingMessageType); } - public static Func> GetMessageHandler(this IPipeContext context) - { - return context.Get>>(PipeKey.MessageHandler); - } - public static PublicationAddress GetPublicationAddress(this IPipeContext context) { return context.Get(RequestKey.PublicationAddress); diff --git a/src/RawRabbit.Operations.Respond/Middleware/RespondExceptionMiddleware.cs b/src/RawRabbit.Operations.Respond/Middleware/RespondExceptionMiddleware.cs index db22f0c6..77766189 100644 --- a/src/RawRabbit.Operations.Respond/Middleware/RespondExceptionMiddleware.cs +++ b/src/RawRabbit.Operations.Respond/Middleware/RespondExceptionMiddleware.cs @@ -52,7 +52,7 @@ protected virtual void AddAcknowledgementToContext(IPipeContext context, Consume { return; } - context.Properties.TryAdd(PipeKey.MessageHandlerResult, Task.FromResult(new Ack())); + context.Properties.TryAdd(PipeKey.MessageAcknowledgement, new Ack()); } protected virtual BasicDeliverEventArgs GetDeliveryArgs(IPipeContext context) diff --git a/src/RawRabbit.Operations.Respond/Middleware/ResponseHandlerOptionFactory.cs b/src/RawRabbit.Operations.Respond/Middleware/ResponseHandlerOptionFactory.cs index 7c2e611d..87a076b7 100644 --- a/src/RawRabbit.Operations.Respond/Middleware/ResponseHandlerOptionFactory.cs +++ b/src/RawRabbit.Operations.Respond/Middleware/ResponseHandlerOptionFactory.cs @@ -1,5 +1,4 @@ -using System.Threading.Tasks; -using RawRabbit.Operations.Respond.Acknowledgement; +using RawRabbit.Operations.Respond.Acknowledgement; using RawRabbit.Operations.Respond.Core; using RawRabbit.Pipe; using RawRabbit.Pipe.Middleware; @@ -15,18 +14,7 @@ public static HandlerInvocationOptions Create(HandlerInvocationOptions options = HandlerArgsFunc = options?.HandlerArgsFunc ?? (context => new[] {context.GetMessage()}), PostInvokeAction = options?.PostInvokeAction ?? ((context, task) => { - if (task.IsFaulted) - { - return; - } - if (task is Task responseTask) - { - context.Properties.TryAdd(RespondKey.ResponseMessage, responseTask.Result); - return; - } - - var ackTask = task as Task; - if (ackTask?.Result is Ack ack) + if (task is Ack ack) { context.Properties.TryAdd(RespondKey.ResponseMessage, ack.Response); } diff --git a/src/RawRabbit.Operations.Respond/RespondExtension.cs b/src/RawRabbit.Operations.Respond/RespondExtension.cs index 6b8b53f0..74451b30 100644 --- a/src/RawRabbit.Operations.Respond/RespondExtension.cs +++ b/src/RawRabbit.Operations.Respond/RespondExtension.cs @@ -76,14 +76,11 @@ public static Task RespondAsync( Action context = null, CancellationToken ct = default(CancellationToken)) { - return client.RespondAsync(request => handler - .Invoke(request) - .ContinueWith>(t => - { - if (t.IsFaulted) - throw t.Exception; - return new Ack(t.Result); - }, ct), + return client.RespondAsync(async request => + { + var response = await handler(request); + return new Ack(response); + }, context, ct); } @@ -99,7 +96,7 @@ public static Task RespondAsync( RespondPipe, ctx => { - Func genericHandler = args => handler((TRequest)args[0]) + Func> genericHandler = args => handler((TRequest)args[0]) .ContinueWith(tResponse => { if (tResponse.IsFaulted) diff --git a/src/RawRabbit.Operations.StateMachine/Trigger/TriggerFromMessageExtension.cs b/src/RawRabbit.Operations.StateMachine/Trigger/TriggerFromMessageExtension.cs index 3985b214..90220a66 100644 --- a/src/RawRabbit.Operations.StateMachine/Trigger/TriggerFromMessageExtension.cs +++ b/src/RawRabbit.Operations.StateMachine/Trigger/TriggerFromMessageExtension.cs @@ -40,7 +40,7 @@ public static TriggerConfigurer FromMessage( Action consumeConfig = null ) { - Func genericHandler = args => machineFunc((TStateMachine)args[0], (TMessage)args[1]).ContinueWith(t => new Ack()); + Func> genericHandler = args => machineFunc((TStateMachine)args[0], (TMessage)args[1]).ContinueWith(t => new Ack()); Func genericCorrFunc = args => correlationFunc((TMessage)args[0]); return configurer.From(SubscribePipe, context => diff --git a/src/RawRabbit.Operations.Subscribe/SubscribeMessageExtension.cs b/src/RawRabbit.Operations.Subscribe/SubscribeMessageExtension.cs index b259b764..c68f4b57 100644 --- a/src/RawRabbit.Operations.Subscribe/SubscribeMessageExtension.cs +++ b/src/RawRabbit.Operations.Subscribe/SubscribeMessageExtension.cs @@ -42,11 +42,11 @@ public static class SubscribeMessageExtension public static Task SubscribeAsync(this IBusClient client, Func subscribeMethod, Action context = null, CancellationToken ct = default(CancellationToken)) { - return client.SubscribeAsync( - message => subscribeMethod - .Invoke(message) - .ContinueWith(t => new Ack(), ct), - context, ct); + return client.SubscribeAsync(async message => + { + await subscribeMethod(message); + return new Ack(); + }, context, ct); } public static Task SubscribeAsync(this IBusClient client, Func> subscribeMethod, Action context = null, CancellationToken ct = default(CancellationToken)) @@ -55,7 +55,7 @@ public static class SubscribeMessageExtension SubscribePipe, ctx => { - Func genericHandler = args => subscribeMethod((TMessage) args[0]); + Func> genericHandler = args => subscribeMethod((TMessage) args[0]); ctx.Properties.TryAdd(PipeKey.MessageType, typeof(TMessage)); ctx.Properties.TryAdd(PipeKey.MessageHandler, genericHandler); diff --git a/src/RawRabbit.Operations.Tools/BasicConsumeExtension.cs b/src/RawRabbit.Operations.Tools/BasicConsumeExtension.cs index 0f54afa6..77a915ee 100644 --- a/src/RawRabbit.Operations.Tools/BasicConsumeExtension.cs +++ b/src/RawRabbit.Operations.Tools/BasicConsumeExtension.cs @@ -12,7 +12,7 @@ public static class BasicConsumeExtension public static Task BasicConsumeAsync(this IBusClient busClient, Func> consumeFunc, Action context) { - Func genericFunc = args => consumeFunc(args[0] as BasicDeliverEventArgs); + Func> genericFunc = args => consumeFunc(args[0] as BasicDeliverEventArgs); return busClient .InvokeAsync(pipe => diff --git a/src/RawRabbit/Pipe/Middleware/ExplicitAckMiddleware.cs b/src/RawRabbit/Pipe/Middleware/ExplicitAckMiddleware.cs index 80119307..4f8f3b9e 100644 --- a/src/RawRabbit/Pipe/Middleware/ExplicitAckMiddleware.cs +++ b/src/RawRabbit/Pipe/Middleware/ExplicitAckMiddleware.cs @@ -18,7 +18,7 @@ public class ExplicitAckOptions public Func ConsumerFunc { get; set; } public Func DeliveryArgsFunc { get; set; } public Func AutoAckFunc { get; set; } - public Func InvocationResultFunc { get; set; } + public Func GetMessageAcknowledgement { get; set; } public Predicate AbortExecution { get; set; } } @@ -29,7 +29,7 @@ public class ExplicitAckMiddleware : Middleware protected readonly IChannelFactory ChannelFactory; protected Func DeliveryArgsFunc; protected Func ConsumerFunc; - protected Func InvocationResultFunc; + protected Func MessageAcknowledgementFunc; protected Predicate AbortExecution; protected Func AutoAckFunc; @@ -40,7 +40,7 @@ public ExplicitAckMiddleware(INamingConventions conventions, ITopologyProvider t ChannelFactory = channelFactory; DeliveryArgsFunc = options?.DeliveryArgsFunc ?? (context => context.GetDeliveryEventArgs()); ConsumerFunc = options?.ConsumerFunc ?? (context => context.GetConsumer()); - InvocationResultFunc = options?.InvocationResultFunc ?? (context => context.GetMessageHandlerResult()); + MessageAcknowledgementFunc = options?.GetMessageAcknowledgement ?? (context => context.GetMessageAcknowledgement()); AbortExecution = options?.AbortExecution ?? (ack => !(ack is Ack)); AutoAckFunc = options?.AutoAckFunc ?? (context => context.GetConsumeConfiguration().AutoAck); } @@ -61,10 +61,10 @@ public override async Task InvokeAsync(IPipeContext context, CancellationToken t protected virtual async Task AcknowledgeMessageAsync(IPipeContext context) { - var ack = (InvocationResultFunc(context) as Task)?.Result; + var ack = MessageAcknowledgementFunc(context); if (ack == null) { - throw new NotSupportedException($"Invocation Result of Message Handler not found."); + throw new NotSupportedException("Invocation Result of Message Handler not found."); } var deliveryArgs = DeliveryArgsFunc(context); var channel = ConsumerFunc(context).Model; diff --git a/src/RawRabbit/Pipe/Middleware/HandlerInvocationMiddleware.cs b/src/RawRabbit/Pipe/Middleware/HandlerInvocationMiddleware.cs index e1c42562..588b51f8 100644 --- a/src/RawRabbit/Pipe/Middleware/HandlerInvocationMiddleware.cs +++ b/src/RawRabbit/Pipe/Middleware/HandlerInvocationMiddleware.cs @@ -1,21 +1,22 @@ using System; using System.Threading; using System.Threading.Tasks; +using RawRabbit.Common; namespace RawRabbit.Pipe.Middleware { public class HandlerInvocationOptions { - public Func> MessageHandlerFunc { get; set; } + public Func>> MessageHandlerFunc { get; set; } public Func HandlerArgsFunc { get; set; } - public Action PostInvokeAction { get; set; } + public Action PostInvokeAction { get; set; } } public class HandlerInvocationMiddleware : Middleware { protected Func HandlerArgsFunc; - protected Action PostInvokeAction; - protected Func> MessageHandlerFunc; + protected Action PostInvokeAction; + protected Func>> MessageHandlerFunc; public HandlerInvocationMiddleware(HandlerInvocationOptions options = null) { @@ -30,23 +31,14 @@ public override async Task InvokeAsync(IPipeContext context, CancellationToken t await Next.InvokeAsync(context, token); } - protected virtual Task InvokeMessageHandler(IPipeContext context, CancellationToken token) + protected virtual async Task InvokeMessageHandler(IPipeContext context, CancellationToken token) { var args = HandlerArgsFunc(context); var handler = MessageHandlerFunc(context); - return handler(args) - .ContinueWith(t => - { - if (t.IsFaulted) - { - throw t.Exception; - } - context.Properties.TryAdd(PipeKey.MessageHandlerResult, t); - PostInvokeAction?.Invoke(context, t); - return t; - }, token) - .Unwrap(); + var acknowledgement = await handler(args); + context.Properties.TryAdd(PipeKey.MessageAcknowledgement, acknowledgement); + PostInvokeAction?.Invoke(context, acknowledgement); } } } diff --git a/src/RawRabbit/Pipe/PipeContextExtension.cs b/src/RawRabbit/Pipe/PipeContextExtension.cs index c87f2c38..4392409c 100644 --- a/src/RawRabbit/Pipe/PipeContextExtension.cs +++ b/src/RawRabbit/Pipe/PipeContextExtension.cs @@ -3,6 +3,7 @@ using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; +using RawRabbit.Common; using RawRabbit.Configuration; using RawRabbit.Configuration.BasicPublish; using RawRabbit.Configuration.Consume; @@ -106,10 +107,9 @@ public static BasicDeliverEventArgs GetDeliveryEventArgs(this IPipeContext conte return context.Get(PipeKey.DeliveryEventArgs); } - - public static Func GetMessageHandler(this IPipeContext context) + public static Func> GetMessageHandler(this IPipeContext context) { - return context.Get>(PipeKey.MessageHandler); + return context.Get>>(PipeKey.MessageHandler); } public static object[] GetMessageHandlerArgs(this IPipeContext context) @@ -122,6 +122,11 @@ public static Task GetMessageHandlerResult(this IPipeContext context) return context.Get(PipeKey.MessageHandlerResult); } + public static Acknowledgement GetMessageAcknowledgement(this IPipeContext context) + { + return context.Get(PipeKey.MessageAcknowledgement); + } + public static RawRabbitConfiguration GetClientConfiguration(this IPipeContext context) { return context.Get(PipeKey.ClientConfiguration); diff --git a/src/RawRabbit/Pipe/PipeKey.cs b/src/RawRabbit/Pipe/PipeKey.cs index fd622743..907e6e83 100644 --- a/src/RawRabbit/Pipe/PipeKey.cs +++ b/src/RawRabbit/Pipe/PipeKey.cs @@ -13,6 +13,7 @@ public static class PipeKey public const string MessageHandler = "MessageHandler"; public const string MessageHandlerArgs = "MessageHandlerArgs"; public const string MessageHandlerResult = "MessageHandlerResult"; + public const string MessageAcknowledgement = "MessageAcknowledgement"; public const string BasicProperties = "BasicProperties"; public const string ConfigurationAction = "ConfigurationAction"; public const string DeliveryEventArgs = "DeliveryEventArgs"; diff --git a/test/RawRabbit.IntegrationTests/GetOperation/BasicGetTests.cs b/test/RawRabbit.IntegrationTests/GetOperation/BasicGetTests.cs index 78c022df..1056cc6a 100644 --- a/test/RawRabbit.IntegrationTests/GetOperation/BasicGetTests.cs +++ b/test/RawRabbit.IntegrationTests/GetOperation/BasicGetTests.cs @@ -2,7 +2,6 @@ using RabbitMQ.Client; using RawRabbit.Common; using RawRabbit.IntegrationTests.TestMessages; -using RawRabbit.Pipe; using Xunit; namespace RawRabbit.IntegrationTests.GetOperation diff --git a/test/RawRabbit.IntegrationTests/Rpc/RpcFundamentalTests.cs b/test/RawRabbit.IntegrationTests/Rpc/RpcFundamentalTests.cs index 716938d4..08784326 100644 --- a/test/RawRabbit.IntegrationTests/Rpc/RpcFundamentalTests.cs +++ b/test/RawRabbit.IntegrationTests/Rpc/RpcFundamentalTests.cs @@ -2,9 +2,7 @@ using System.Linq; using System.Threading.Tasks; using RawRabbit.IntegrationTests.TestMessages; -using RawRabbit.Operations.Request.Core; using RawRabbit.Operations.Request.Middleware; -using RawRabbit.Operations.Respond.Core; using Xunit; namespace RawRabbit.IntegrationTests.Rpc