Skip to content

Commit

Permalink
Archiver Dependencies (BrighterCommand#3090)
Browse files Browse the repository at this point in the history
* Move archive functionality to external service bus, prevent need for archiver to take a direct Outbox dependency

* Move archiver function to bus; organize CP tests

* Test that external bus archive works as expected

* Add tests around archiving async messages

* Add a test to show we only archive dispatched

* Ensure tests for archiving cover the boundary conditions

* Fix issues with the setup not taking in the new ArchiverProvider dependency for the bus
  • Loading branch information
iancooper authored May 16, 2024
1 parent a0d85e9 commit b3346eb
Show file tree
Hide file tree
Showing 84 changed files with 823 additions and 436 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public static IBrighterBuilder UseExternalBus(

brighterBuilder.Services.TryAdd(new ServiceDescriptor(typeof(IAmAnExternalBusService),
(serviceProvider) => BuildExternalBus(
serviceProvider, transactionType, busConfiguration, brighterBuilder.PolicyRegistry, outbox
serviceProvider, transactionType, busConfiguration, brighterBuilder.PolicyRegistry, outbox, busConfiguration.ArchiveProvider
),
ServiceLifetime.Singleton));

Expand Down Expand Up @@ -314,7 +314,7 @@ private static IAmAnExternalBusService BuildExternalBus(IServiceProvider service
Type transactionType,
ExternalBusConfiguration busConfiguration,
IPolicyRegistry<string> policyRegistry,
IAmAnOutbox outbox)
IAmAnOutbox outbox, IAmAnArchiveProvider archiver = null)
{
//Because the bus has specialized types as members, we need to create the bus type dynamically
//again to prevent someone configuring Brighter from having to pass generic types
Expand All @@ -327,11 +327,13 @@ private static IAmAnExternalBusService BuildExternalBus(IServiceProvider service
TransformFactory(serviceProvider),
TransformFactoryAsync(serviceProvider),
outbox,
archiver,
busConfiguration.OutboxBulkChunkSize,
busConfiguration.OutboxTimeout,
busConfiguration.MaxOutStandingMessages,
busConfiguration.MaxOutStandingCheckIntervalMilliSeconds,
busConfiguration.OutBoxBag);
busConfiguration.OutBoxBag,
busConfiguration.ArchiveBatchSize);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static IBrighterBuilder UseOutboxArchiver<TTransaction>(this IBrighterBui
brighterBuilder.Services.TryAddSingleton<TimedOutboxArchiverOptions>(options);
brighterBuilder.Services.AddSingleton<IAmAnArchiveProvider>(archiveProvider);

brighterBuilder.Services.AddHostedService<TimedOutboxArchiver<Message, TTransaction>>();
brighterBuilder.Services.AddHostedService<TimedOutboxArchiver>();

return brighterBuilder;
}
Expand Down
38 changes: 13 additions & 25 deletions src/Paramore.Brighter.Extensions.Hosting/TimedOutboxArchiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,28 @@
namespace Paramore.Brighter.Extensions.Hosting
{

public class TimedOutboxArchiver<TMessage, TTransaction> : IHostedService, IDisposable where TMessage : Message
public class TimedOutboxArchiver(
IServiceScopeFactory serviceScopeFactory,
TimedOutboxArchiverOptions options)
: IHostedService, IDisposable
{
private readonly TimedOutboxArchiverOptions _options;
private static readonly ILogger s_logger = ApplicationLogging.CreateLogger<TimedOutboxSweeper>();
private readonly IAmAnOutbox _outbox;
private readonly IAmAnArchiveProvider _archiveProvider;
private Timer _timer;

private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);

public TimedOutboxArchiver(
IAmAnOutbox outbox,
IAmAnArchiveProvider archiveProvider,
TimedOutboxArchiverOptions options)
{
_outbox = outbox;
_archiveProvider = archiveProvider;
_options = options;
}

public Task StartAsync(CancellationToken cancellationToken)
{
s_logger.LogInformation("Outbox Archiver Service is starting.");
s_logger.LogInformation("Outbox Archiver Service is starting");

_timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval));
_timer = new Timer(async (e) => await Archive(e, cancellationToken), null, TimeSpan.Zero, TimeSpan.FromSeconds(options.TimerInterval));

return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
s_logger.LogInformation("Outbox Archiver Service is stopping.");
s_logger.LogInformation("Outbox Archiver Service is stopping");

_timer?.Change(Timeout.Infinite, 0);

Expand All @@ -56,19 +46,17 @@ private async Task Archive(object state, CancellationToken cancellationToken)
{
if (await _semaphore.WaitAsync(TimeSpan.Zero, cancellationToken))
{
var scope = serviceScopeFactory.CreateScope();
s_logger.LogInformation("Outbox Archiver looking for messages to Archive");
try
{
var outBoxArchiver = new OutboxArchiver<TMessage, TTransaction>(
_outbox,
_archiveProvider,
_options.BatchSize);

await outBoxArchiver.ArchiveAsync(_options.MinimumAge, cancellationToken, _options.ParallelArchiving);
IAmAnExternalBusService externalBusService = scope.ServiceProvider.GetService<IAmAnExternalBusService>();

await externalBusService.ArchiveAsync(options.MinimumAge, cancellationToken);
}
catch (Exception e)
{
s_logger.LogError(e, "Error while sweeping the outbox.");
s_logger.LogError(e, "Error while sweeping the outbox");
}
finally
{
Expand All @@ -79,7 +67,7 @@ private async Task Archive(object state, CancellationToken cancellationToken)
}
else
{
s_logger.LogWarning("Outbox Archiver is still running - abandoning attempt.");
s_logger.LogWarning("Outbox Archiver is still running - abandoning attempt");
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,9 @@ public class TimedOutboxArchiverOptions
/// </summary>
public int TimerInterval { get; set; } = 15;

/// <summary>
/// The maximum number of messages to dispatch.
/// </summary>
public int BatchSize { get; set; } = 100;

/// <summary>
/// The minimum age in hours to Archive
/// </summary>
public int MinimumAge { get; set; } = 24;

/// <summary>
/// Send messages to the archive provider in parallel
/// </summary>
public bool ParallelArchiving { get; set; } = false;
}
}
21 changes: 19 additions & 2 deletions src/Paramore.Brighter/CommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,7 @@ public TResponse Call<T, TResponse>(T request, int timeOutInMilliseconds)
//we do this to create the channel on the broker, or we won't have anything to send to; we
//retry in case the subscription is poor. An alternative would be to extract the code from
//the channel to create the subscription, but this does not do much on a new queue
_bus.Retry(() => responseChannel.Purge());
Retry(() => responseChannel.Purge());

var outMessage = _bus.CreateMessageFromRequest(request);

Expand All @@ -810,7 +810,7 @@ public TResponse Call<T, TResponse>(T request, int timeOutInMilliseconds)

//now we block on the receiver to try and get the message, until timeout.
s_logger.LogDebug("Awaiting response on {ChannelName}", channelName);
_bus.Retry(() => responseMessage = responseChannel.Receive(timeOutInMilliseconds));
Retry(() => responseMessage = responseChannel.Receive(timeOutInMilliseconds));

TResponse response = default(TResponse);
if (responseMessage.Header.MessageType != MessageType.MT_NONE)
Expand Down Expand Up @@ -907,6 +907,23 @@ private bool HandlerFactoryIsNotEitherIAmAHandlerFactorySyncOrAsync(IAmAHandlerF
}
}

private bool Retry(Action action)
{
var policy = _policyRegistry.Get<Policy>(CommandProcessor.RETRYPOLICY);
var result = policy.ExecuteAndCapture(action);
if (result.Outcome != OutcomeType.Successful)
{
if (result.FinalException != null)
{
s_logger.LogError(result.FinalException, "Exception whilst trying to publish message");
}

return false;
}

return true;
}

private IEnumerable<IGrouping<Type, T>> SplitRequestBatchIntoTypes<T>(IEnumerable<T> requests)
{
return requests.GroupBy(r => r.GetType());
Expand Down
11 changes: 10 additions & 1 deletion src/Paramore.Brighter/ExternalBusConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,16 @@ public interface IAmExternalBusConfiguration
/// </summary>
public class ExternalBusConfiguration : IAmExternalBusConfiguration
{
/// <summary>
/// How big should the batch size be for archiving messages
/// </summary>
public int ArchiveBatchSize { get; set; }

/// <summary>
/// If we want to archive messages, abstracts archival storage
/// </summary>
public IAmAnArchiveProvider ArchiveProvider { get; set; }

/// <summary>
/// How do obtain a connection to the Outbox that is not part of a shared transaction.
/// NOTE: Must implement IAmARelationalDbConnectionProvider
Expand Down Expand Up @@ -199,7 +209,6 @@ public class ExternalBusConfiguration : IAmExternalBusConfiguration
/// </summary>
public bool UseRpc { get; set; }


/// <summary>
/// Initializes a new instance of the <see cref="ExternalBusConfiguration"/> class.
/// </summary>
Expand Down
Loading

0 comments on commit b3346eb

Please sign in to comment.