-
Notifications
You must be signed in to change notification settings - Fork 143
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
7a8ec04
commit 35ecde7
Showing
23 changed files
with
493 additions
and
95 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
using System; | ||
|
||
namespace RawRabbit.Common | ||
{ | ||
public class Retry : Acknowledgement | ||
{ | ||
public TimeSpan Span { get; set; } | ||
|
||
public Retry(TimeSpan span) | ||
{ | ||
Span = span; | ||
} | ||
|
||
public static Retry In(TimeSpan span) | ||
{ | ||
return new Retry(span); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
namespace RawRabbit.Common | ||
{ | ||
public class RetryHeaders | ||
{ | ||
public const string NumberOfRetries = "x-number-of-retries"; | ||
public const string OriginalDelivered = "x-original-delivered"; | ||
} | ||
} |
10 changes: 10 additions & 0 deletions
10
src/RawRabbit.Enrichers.RetryLater/Common/RetryInformation.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
using System; | ||
|
||
namespace RawRabbit.Common | ||
{ | ||
public class RetryInformation | ||
{ | ||
public int NumberOfRetries { get; set; } | ||
public DateTime OriginalDelivered { get; set; } | ||
} | ||
} |
68 changes: 68 additions & 0 deletions
68
src/RawRabbit.Enrichers.RetryLater/Common/RetryInformationHeaderUpdater.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using RabbitMQ.Client.Events; | ||
|
||
namespace RawRabbit.Common | ||
{ | ||
public interface IRetryInformationHeaderUpdater | ||
{ | ||
void AddOrUpdate(BasicDeliverEventArgs args); | ||
void AddOrUpdate(BasicDeliverEventArgs args, RetryInformation retryInfo); | ||
} | ||
|
||
public class RetryInformationHeaderUpdater : IRetryInformationHeaderUpdater | ||
{ | ||
public void AddOrUpdate(BasicDeliverEventArgs args) | ||
{ | ||
TryAddOriginalDelivered(args, DateTime.UtcNow); | ||
AddOrUpdateNumberOfRetries(args); | ||
} | ||
|
||
public void AddOrUpdate(BasicDeliverEventArgs args, RetryInformation retryInfo) | ||
{ | ||
TryAddOriginalDelivered(args, retryInfo.OriginalDelivered); | ||
AddOrUpdateNumberOfRetries(args); | ||
} | ||
|
||
private void AddOrUpdateNumberOfRetries(BasicDeliverEventArgs args) | ||
{ | ||
var currentRetry = 0; | ||
if (args.BasicProperties.Headers.ContainsKey(RetryHeaders.NumberOfRetries)) | ||
{ | ||
var valueStr = GetHeaderString(args.BasicProperties.Headers, RetryHeaders.NumberOfRetries); | ||
currentRetry = int.Parse(valueStr); | ||
args.BasicProperties.Headers.Remove(RetryHeaders.NumberOfRetries); | ||
} | ||
var nextRetry = (++currentRetry).ToString(); | ||
args.BasicProperties.Headers.Add(RetryHeaders.NumberOfRetries, nextRetry); | ||
} | ||
|
||
private static void TryAddOriginalDelivered(BasicDeliverEventArgs args, DateTime originalDelivered) | ||
{ | ||
if (args.BasicProperties.Headers.ContainsKey(RetryHeaders.OriginalDelivered)) | ||
{ | ||
return; | ||
} | ||
args.BasicProperties.Headers.Add(RetryHeaders.OriginalDelivered, originalDelivered.ToString("u")); | ||
} | ||
|
||
private static string GetHeaderString(IDictionary<string, object> headers, string key) | ||
{ | ||
if (headers == null) | ||
{ | ||
return null; | ||
} | ||
if (!headers.ContainsKey(key)) | ||
{ | ||
return null; | ||
} | ||
if (!(headers[key] is byte[] headerBytes)) | ||
{ | ||
return null; | ||
} | ||
|
||
var headerStr = System.Text.Encoding.UTF8.GetString(headerBytes); | ||
return headerStr; | ||
} | ||
} | ||
} |
54 changes: 54 additions & 0 deletions
54
src/RawRabbit.Enrichers.RetryLater/Common/RetryInformationProvider.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using RabbitMQ.Client.Events; | ||
|
||
namespace RawRabbit.Common | ||
{ | ||
public interface IRetryInformationProvider | ||
{ | ||
RetryInformation Get(BasicDeliverEventArgs args); | ||
} | ||
|
||
public class RetryInformationProvider : IRetryInformationProvider | ||
{ | ||
public RetryInformation Get(BasicDeliverEventArgs args) | ||
{ | ||
return new RetryInformation | ||
{ | ||
NumberOfRetries = ExtractNumberOfRetries(args), | ||
OriginalDelivered = ExtractOriginalDelivered(args) | ||
}; | ||
} | ||
|
||
private DateTime ExtractOriginalDelivered(BasicDeliverEventArgs args) | ||
{ | ||
var headerValue = GetHeaderString(args.BasicProperties.Headers, RetryHeaders.OriginalDelivered); | ||
return DateTime.TryParse(headerValue, out var originalSent) ? originalSent : DateTime.UtcNow; | ||
} | ||
|
||
private int ExtractNumberOfRetries(BasicDeliverEventArgs args) | ||
{ | ||
var headerValue = GetHeaderString(args.BasicProperties.Headers, RetryHeaders.NumberOfRetries); | ||
return int.TryParse(headerValue, out var noOfRetries) ? noOfRetries : 0; | ||
} | ||
|
||
private static string GetHeaderString(IDictionary<string, object> headers, string key) | ||
{ | ||
if (headers == null) | ||
{ | ||
return null; | ||
} | ||
if (!headers.ContainsKey(key)) | ||
{ | ||
return null; | ||
} | ||
if (!(headers[key] is byte[] headerBytes)) | ||
{ | ||
return null; | ||
} | ||
|
||
var headerStr = System.Text.Encoding.UTF8.GetString(headerBytes); | ||
return headerStr; | ||
} | ||
} | ||
} |
20 changes: 20 additions & 0 deletions
20
src/RawRabbit.Enrichers.RetryLater/Common/RetryLaterPipeContextExtensions.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
using RawRabbit.Pipe; | ||
|
||
namespace RawRabbit.Common | ||
{ | ||
public static class RetryLaterPipeContextExtensions | ||
{ | ||
private const string RetryInformationKey = "RetryInformation"; | ||
|
||
internal static IPipeContext AddRetryInformation(this IPipeContext context, RetryInformation retryInformation) | ||
{ | ||
context.Properties.TryAdd(RetryInformationKey, retryInformation); | ||
return context; | ||
} | ||
|
||
public static RetryInformation GetRetryInformation(this IPipeContext context) | ||
{ | ||
return context.Get<RetryInformation>(RetryInformationKey); | ||
} | ||
} | ||
} |
51 changes: 51 additions & 0 deletions
51
src/RawRabbit.Enrichers.RetryLater/Middleware/RetryInformationExtractionMiddleware.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
using System; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using RabbitMQ.Client.Events; | ||
using RawRabbit.Common; | ||
using RawRabbit.Pipe; | ||
using RawRabbit.Pipe.Middleware; | ||
|
||
namespace RawRabbit.Middleware | ||
{ | ||
public class RetryInformationExtractionOptions | ||
{ | ||
public Func<IPipeContext, BasicDeliverEventArgs> DeliveryArgsFunc { get; set; } | ||
} | ||
|
||
public class RetryInformationExtractionMiddleware : StagedMiddleware | ||
{ | ||
private readonly IRetryInformationProvider _retryProvider; | ||
protected Func<IPipeContext, BasicDeliverEventArgs> DeliveryArgsFunc; | ||
public override string StageMarker => Pipe.StageMarker.MessageRecieved; | ||
|
||
public RetryInformationExtractionMiddleware(IRetryInformationProvider retryProvider, RetryInformationExtractionOptions options = null) | ||
{ | ||
_retryProvider = retryProvider; | ||
DeliveryArgsFunc = options?.DeliveryArgsFunc ?? (context => context.GetDeliveryEventArgs()); | ||
} | ||
|
||
public override Task InvokeAsync(IPipeContext context, CancellationToken token = default(CancellationToken)) | ||
{ | ||
var retryInfo = GetRetryInformation(context); | ||
AddToPipeContext(context, retryInfo); | ||
return Next.InvokeAsync(context, token); | ||
} | ||
|
||
protected virtual BasicDeliverEventArgs GetDeliveryEventArgs(IPipeContext context) | ||
{ | ||
return DeliveryArgsFunc?.Invoke(context); | ||
} | ||
|
||
protected virtual RetryInformation GetRetryInformation(IPipeContext context) | ||
{ | ||
var devlieryArgs = GetDeliveryEventArgs(context); | ||
return _retryProvider.Get(devlieryArgs); | ||
} | ||
|
||
protected virtual void AddToPipeContext(IPipeContext context, RetryInformation retryInfo) | ||
{ | ||
context.AddRetryInformation(retryInfo); | ||
} | ||
} | ||
} |
Oops, something went wrong.