Skip to content

Commit

Permalink
Simplify ack management in pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
pardahlman committed Nov 15, 2017
1 parent 885a1ca commit 7a8ec04
Show file tree
Hide file tree
Showing 16 changed files with 43 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static Task<IPipeContext> RespondAsync<TRequest, TResponse, TMessageConte
return client
.InvokeAsync(RespondPipe, ctx =>
{
Func<object[], Task> genericHandler = args => (handler((TRequest) args[0], (TMessageContext) args[1])
Func<object[], Task<Acknowledgement>> genericHandler = args => (handler((TRequest) args[0], (TMessageContext) args[1])
.ContinueWith(tResponse =>
{
if (tResponse.IsFaulted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static Task<IPipeContext> SubscribeAsync<TMessage, TMessageContext>(
SubscribePipe,
ctx =>
{
Func<object[], Task> genericHandler = args => subscribeMethod((TMessage)args[0], (TMessageContext)args[1]);
Func<object[], Task<Acknowledgement>> genericHandler = args => subscribeMethod((TMessage)args[0], (TMessageContext)args[1]);

ctx.Properties.TryAdd(PipeKey.MessageType, typeof(TMessage));
if (!ctx.Properties.ContainsKey(PipeContextExtensions.PipebasedContextFunc))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static TriggerConfigurer FromMessage<TStateMachine, TMessage, TMessageCon
Action<IConsumerConfigurationBuilder> consumeConfig = null
)
{
Func<object[], Task> genericHandler = args =>
Func<object[], Task<Acknowledgement>> genericHandler = args =>
machineFunc((TStateMachine) args[0], (TMessage) args[1], (TMessageContext)args[2])
.ContinueWith<Acknowledgement>(t => new Ack());
Func<object, object, Guid> genericCorrFunc = (msg, ctx) => correlationFunc((TMessage) msg, (TMessageContext)ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ public static Type GetRequestMessageType(this IPipeContext context)
return context.Get<Type>(RequestKey.OutgoingMessageType);
}

public static Func<object, Task<object>> GetMessageHandler(this IPipeContext context)
{
return context.Get<Func<object, Task<object>>>(PipeKey.MessageHandler);
}

public static PublicationAddress GetPublicationAddress(this IPipeContext context)
{
return context.Get<PublicationAddress>(RequestKey.PublicationAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected virtual void AddAcknowledgementToContext(IPipeContext context, Consume
{
return;
}
context.Properties.TryAdd(PipeKey.MessageHandlerResult, Task.FromResult<Common.Acknowledgement>(new Ack()));
context.Properties.TryAdd(PipeKey.MessageAcknowledgement, new Ack());
}

protected virtual BasicDeliverEventArgs GetDeliveryArgs(IPipeContext context)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Threading.Tasks;
using RawRabbit.Operations.Respond.Acknowledgement;
using RawRabbit.Operations.Respond.Acknowledgement;
using RawRabbit.Operations.Respond.Core;
using RawRabbit.Pipe;
using RawRabbit.Pipe.Middleware;
Expand All @@ -15,18 +14,7 @@ public static HandlerInvocationOptions Create(HandlerInvocationOptions options =
HandlerArgsFunc = options?.HandlerArgsFunc ?? (context => new[] {context.GetMessage()}),
PostInvokeAction = options?.PostInvokeAction ?? ((context, task) =>
{
if (task.IsFaulted)
{
return;
}
if (task is Task<object> responseTask)
{
context.Properties.TryAdd(RespondKey.ResponseMessage, responseTask.Result);
return;
}

var ackTask = task as Task<Common.Acknowledgement>;
if (ackTask?.Result is Ack ack)
if (task is Ack ack)
{
context.Properties.TryAdd(RespondKey.ResponseMessage, ack.Response);
}
Expand Down
15 changes: 6 additions & 9 deletions src/RawRabbit.Operations.Respond/RespondExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,11 @@ public static Task RespondAsync<TRequest, TResponse>(
Action<IRespondContext> context = null,
CancellationToken ct = default(CancellationToken))
{
return client.RespondAsync<TRequest, TResponse>(request => handler
.Invoke(request)
.ContinueWith<TypedAcknowlegement<TResponse>>(t =>
{
if (t.IsFaulted)
throw t.Exception;
return new Ack<TResponse>(t.Result);
}, ct),
return client.RespondAsync<TRequest, TResponse>(async request =>
{
var response = await handler(request);
return new Ack<TResponse>(response);
},
context,
ct);
}
Expand All @@ -99,7 +96,7 @@ public static Task RespondAsync<TRequest, TResponse>(
RespondPipe,
ctx =>
{
Func<object[], Task> genericHandler = args => handler((TRequest)args[0])
Func<object[], Task<Acknowledgement>> genericHandler = args => handler((TRequest)args[0])
.ContinueWith(tResponse =>
{
if (tResponse.IsFaulted)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public static TriggerConfigurer FromMessage<TStateMachine, TMessage>(
Action<IConsumerConfigurationBuilder> consumeConfig = null
)
{
Func<object[], Task> genericHandler = args => machineFunc((TStateMachine)args[0], (TMessage)args[1]).ContinueWith<Acknowledgement>(t => new Ack());
Func<object[], Task<Acknowledgement>> genericHandler = args => machineFunc((TStateMachine)args[0], (TMessage)args[1]).ContinueWith<Acknowledgement>(t => new Ack());
Func<object[], Guid> genericCorrFunc = args => correlationFunc((TMessage)args[0]);

return configurer.From(SubscribePipe, context =>
Expand Down
12 changes: 6 additions & 6 deletions src/RawRabbit.Operations.Subscribe/SubscribeMessageExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ public static class SubscribeMessageExtension

public static Task SubscribeAsync<TMessage>(this IBusClient client, Func<TMessage, Task> subscribeMethod, Action<ISubscribeContext> context = null, CancellationToken ct = default(CancellationToken))
{
return client.SubscribeAsync<TMessage>(
message => subscribeMethod
.Invoke(message)
.ContinueWith<Acknowledgement>(t => new Ack(), ct),
context, ct);
return client.SubscribeAsync<TMessage>(async message =>
{
await subscribeMethod(message);
return new Ack();
}, context, ct);
}

public static Task SubscribeAsync<TMessage>(this IBusClient client, Func<TMessage, Task<Acknowledgement>> subscribeMethod, Action<ISubscribeContext> context = null, CancellationToken ct = default(CancellationToken))
Expand All @@ -55,7 +55,7 @@ public static class SubscribeMessageExtension
SubscribePipe,
ctx =>
{
Func<object[], Task> genericHandler = args => subscribeMethod((TMessage) args[0]);
Func<object[], Task<Acknowledgement>> genericHandler = args => subscribeMethod((TMessage) args[0]);

ctx.Properties.TryAdd(PipeKey.MessageType, typeof(TMessage));
ctx.Properties.TryAdd(PipeKey.MessageHandler, genericHandler);
Expand Down
2 changes: 1 addition & 1 deletion src/RawRabbit.Operations.Tools/BasicConsumeExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public static class BasicConsumeExtension
public static Task BasicConsumeAsync(this IBusClient busClient, Func<BasicDeliverEventArgs, Task<Acknowledgement>> consumeFunc,
Action<IPipeContext> context)
{
Func<object[], Task> genericFunc = args => consumeFunc(args[0] as BasicDeliverEventArgs);
Func<object[], Task<Acknowledgement>> genericFunc = args => consumeFunc(args[0] as BasicDeliverEventArgs);

return busClient
.InvokeAsync(pipe =>
Expand Down
10 changes: 5 additions & 5 deletions src/RawRabbit/Pipe/Middleware/ExplicitAckMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class ExplicitAckOptions
public Func<IPipeContext, IBasicConsumer> ConsumerFunc { get; set; }
public Func<IPipeContext, BasicDeliverEventArgs> DeliveryArgsFunc { get; set; }
public Func<IPipeContext, bool> AutoAckFunc { get; set; }
public Func<IPipeContext, Task> InvocationResultFunc { get; set; }
public Func<IPipeContext, Acknowledgement> GetMessageAcknowledgement { get; set; }
public Predicate<Acknowledgement> AbortExecution { get; set; }
}

Expand All @@ -29,7 +29,7 @@ public class ExplicitAckMiddleware : Middleware
protected readonly IChannelFactory ChannelFactory;
protected Func<IPipeContext, BasicDeliverEventArgs> DeliveryArgsFunc;
protected Func<IPipeContext, IBasicConsumer> ConsumerFunc;
protected Func<IPipeContext, Task> InvocationResultFunc;
protected Func<IPipeContext, Acknowledgement> MessageAcknowledgementFunc;
protected Predicate<Acknowledgement> AbortExecution;
protected Func<IPipeContext, bool> AutoAckFunc;

Expand All @@ -40,7 +40,7 @@ public ExplicitAckMiddleware(INamingConventions conventions, ITopologyProvider t
ChannelFactory = channelFactory;
DeliveryArgsFunc = options?.DeliveryArgsFunc ?? (context => context.GetDeliveryEventArgs());
ConsumerFunc = options?.ConsumerFunc ?? (context => context.GetConsumer());
InvocationResultFunc = options?.InvocationResultFunc ?? (context => context.GetMessageHandlerResult());
MessageAcknowledgementFunc = options?.GetMessageAcknowledgement ?? (context => context.GetMessageAcknowledgement());
AbortExecution = options?.AbortExecution ?? (ack => !(ack is Ack));
AutoAckFunc = options?.AutoAckFunc ?? (context => context.GetConsumeConfiguration().AutoAck);
}
Expand All @@ -61,10 +61,10 @@ public override async Task InvokeAsync(IPipeContext context, CancellationToken t

protected virtual async Task<Acknowledgement> AcknowledgeMessageAsync(IPipeContext context)
{
var ack = (InvocationResultFunc(context) as Task<Acknowledgement>)?.Result;
var ack = MessageAcknowledgementFunc(context);
if (ack == null)
{
throw new NotSupportedException($"Invocation Result of Message Handler not found.");
throw new NotSupportedException("Invocation Result of Message Handler not found.");
}
var deliveryArgs = DeliveryArgsFunc(context);
var channel = ConsumerFunc(context).Model;
Expand Down
26 changes: 9 additions & 17 deletions src/RawRabbit/Pipe/Middleware/HandlerInvocationMiddleware.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using RawRabbit.Common;

namespace RawRabbit.Pipe.Middleware
{
public class HandlerInvocationOptions
{
public Func<IPipeContext, Func<object[], Task>> MessageHandlerFunc { get; set; }
public Func<IPipeContext, Func<object[], Task<Acknowledgement>>> MessageHandlerFunc { get; set; }
public Func<IPipeContext, object[]> HandlerArgsFunc { get; set; }
public Action<IPipeContext, Task> PostInvokeAction { get; set; }
public Action<IPipeContext, Acknowledgement> PostInvokeAction { get; set; }
}

public class HandlerInvocationMiddleware : Middleware
{
protected Func<IPipeContext, object[]> HandlerArgsFunc;
protected Action<IPipeContext, Task> PostInvokeAction;
protected Func<IPipeContext, Func<object[], Task>> MessageHandlerFunc;
protected Action<IPipeContext, Acknowledgement> PostInvokeAction;
protected Func<IPipeContext, Func<object[], Task<Acknowledgement>>> MessageHandlerFunc;

public HandlerInvocationMiddleware(HandlerInvocationOptions options = null)
{
Expand All @@ -30,23 +31,14 @@ public override async Task InvokeAsync(IPipeContext context, CancellationToken t
await Next.InvokeAsync(context, token);
}

protected virtual Task InvokeMessageHandler(IPipeContext context, CancellationToken token)
protected virtual async Task InvokeMessageHandler(IPipeContext context, CancellationToken token)
{
var args = HandlerArgsFunc(context);
var handler = MessageHandlerFunc(context);

return handler(args)
.ContinueWith(t =>
{
if (t.IsFaulted)
{
throw t.Exception;
}
context.Properties.TryAdd(PipeKey.MessageHandlerResult, t);
PostInvokeAction?.Invoke(context, t);
return t;
}, token)
.Unwrap();
var acknowledgement = await handler(args);
context.Properties.TryAdd(PipeKey.MessageAcknowledgement, acknowledgement);
PostInvokeAction?.Invoke(context, acknowledgement);
}
}
}
11 changes: 8 additions & 3 deletions src/RawRabbit/Pipe/PipeContextExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RawRabbit.Common;
using RawRabbit.Configuration;
using RawRabbit.Configuration.BasicPublish;
using RawRabbit.Configuration.Consume;
Expand Down Expand Up @@ -106,10 +107,9 @@ public static BasicDeliverEventArgs GetDeliveryEventArgs(this IPipeContext conte
return context.Get<BasicDeliverEventArgs>(PipeKey.DeliveryEventArgs);
}


public static Func<object[], Task> GetMessageHandler(this IPipeContext context)
public static Func<object[], Task<Acknowledgement>> GetMessageHandler(this IPipeContext context)
{
return context.Get<Func<object[], Task>>(PipeKey.MessageHandler);
return context.Get<Func<object[], Task<Acknowledgement>>>(PipeKey.MessageHandler);
}

public static object[] GetMessageHandlerArgs(this IPipeContext context)
Expand All @@ -122,6 +122,11 @@ public static Task GetMessageHandlerResult(this IPipeContext context)
return context.Get<Task>(PipeKey.MessageHandlerResult);
}

public static Acknowledgement GetMessageAcknowledgement(this IPipeContext context)
{
return context.Get<Acknowledgement>(PipeKey.MessageAcknowledgement);
}

public static RawRabbitConfiguration GetClientConfiguration(this IPipeContext context)
{
return context.Get<RawRabbitConfiguration>(PipeKey.ClientConfiguration);
Expand Down
1 change: 1 addition & 0 deletions src/RawRabbit/Pipe/PipeKey.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public static class PipeKey
public const string MessageHandler = "MessageHandler";
public const string MessageHandlerArgs = "MessageHandlerArgs";
public const string MessageHandlerResult = "MessageHandlerResult";
public const string MessageAcknowledgement = "MessageAcknowledgement";
public const string BasicProperties = "BasicProperties";
public const string ConfigurationAction = "ConfigurationAction";
public const string DeliveryEventArgs = "DeliveryEventArgs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using RabbitMQ.Client;
using RawRabbit.Common;
using RawRabbit.IntegrationTests.TestMessages;
using RawRabbit.Pipe;
using Xunit;

namespace RawRabbit.IntegrationTests.GetOperation
Expand Down
2 changes: 0 additions & 2 deletions test/RawRabbit.IntegrationTests/Rpc/RpcFundamentalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
using System.Linq;
using System.Threading.Tasks;
using RawRabbit.IntegrationTests.TestMessages;
using RawRabbit.Operations.Request.Core;
using RawRabbit.Operations.Request.Middleware;
using RawRabbit.Operations.Respond.Core;
using Xunit;

namespace RawRabbit.IntegrationTests.Rpc
Expand Down

0 comments on commit 7a8ec04

Please sign in to comment.