Skip to content

Add job throttle #40

Merged
merged 5 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,20 @@ Horarium guarantees that a job would run **exactly once**

Every Horarium instance consults MongoDB about new jobs to run every 100ms (default), thus creating some load on the DB server. This interval can be changed in ```HorariumSettings```

If you want to decrease load, you can use job throttling that will automatically increase interval if there are no jobs available after certain attempts. To enable this feature, pass `JobThrottleSettings` to `HorariumSettings` with property `UseJobThrottle` set to `true`.

```csharp
var settings = new HorariumSettings
{
JobThrottleSettings = new JobThrottleSettings
{
UseJobThrottle = true
}
};
```

For more information about configuration, see `JobThrottleSettings`

## Using Horarium with SimpleInjector

To use Horarium with SimpleInjector one should implement its own `IJobFactory`, using `Container` from `SimpleInjector`. For example:
Expand Down
117 changes: 117 additions & 0 deletions src/Horarium.Test/RunnerJobTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,123 @@ public async Task Start_WontRecoverBeforeIntervalTimeout_AfterFailedDB()
jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()), Times.Once);
}

[Fact]
public async Task Start_ExecutionWithDelay_WithThrottle()
{
// Arrange
var jobRepositoryMock = new Mock<IJobRepository>();

var settings = new HorariumSettings
{
IntervalStartJob = TimeSpan.FromSeconds(1),
JobThrottleSettings = new JobThrottleSettings
{
UseJobThrottle = true,
IntervalMultiplier = 1,
JobRetrievalAttempts = 1
}
};

var runnerJobs = new RunnerJobs(jobRepositoryMock.Object,
settings,
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>(),
Mock.Of<IUncompletedTaskList>());

// Act
runnerJobs.Start();
await Task.Delay(settings.IntervalStartJob - TimeSpan.FromMilliseconds(500));
jobRepositoryMock.Invocations.Clear();

await Task.Delay(settings.IntervalStartJob + settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier));

// Assert
jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()), Times.Once);
}

[Fact]
public async Task Start_ExecutionWithDelay_IncreaseInterval()
{
// Arrange
var jobRepositoryMock = new Mock<IJobRepository>();

jobRepositoryMock.Setup(x => x.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()))
.ReturnsAsync(() => null);

var settings = new HorariumSettings
{
IntervalStartJob = TimeSpan.FromSeconds(1),
JobThrottleSettings = new JobThrottleSettings
{
UseJobThrottle = true,
IntervalMultiplier = 1,
JobRetrievalAttempts = 1,
}
};

var runnerJobs = new RunnerJobs(jobRepositoryMock.Object,
settings,
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>(),
Mock.Of<IUncompletedTaskList>());

// Act
runnerJobs.Start();
await Task.Delay(settings.IntervalStartJob - TimeSpan.FromMilliseconds(500));
jobRepositoryMock.Invocations.Clear();

var interval = settings.IntervalStartJob +
settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier);
await Task.Delay(interval);
interval += settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier);
await Task.Delay(interval);
interval += settings.IntervalStartJob.Multiply(settings.JobThrottleSettings.IntervalMultiplier);
await Task.Delay(interval);

// Assert
jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()), Times.Exactly(3));
}

[Fact]
public async Task Start_ExecutionWithDelay_MaxInterval()
{
// Arrange
var jobRepositoryMock = new Mock<IJobRepository>();

jobRepositoryMock.Setup(x => x.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()))
.ReturnsAsync(() => null);

var settings = new HorariumSettings
{
IntervalStartJob = TimeSpan.FromSeconds(1),
JobThrottleSettings = new JobThrottleSettings
{
UseJobThrottle = true,
IntervalMultiplier = 1,
JobRetrievalAttempts = 1,
MaxJobThrottleInterval = TimeSpan.FromSeconds(1)
}
};

var runnerJobs = new RunnerJobs(jobRepositoryMock.Object,
settings,
new JsonSerializerSettings(),
Mock.Of<IHorariumLogger>(),
Mock.Of<IExecutorJob>(),
Mock.Of<IUncompletedTaskList>());

// Act
runnerJobs.Start();
await Task.Delay(settings.IntervalStartJob - TimeSpan.FromMilliseconds(500));
jobRepositoryMock.Invocations.Clear();

await Task.Delay(TimeSpan.FromSeconds(5));
// Assert
jobRepositoryMock.Verify(r => r.GetReadyJob(It.IsAny<string>(), It.IsAny<TimeSpan>()), Times.Exactly(5));
}

[Fact]
public async Task Start_NextJobStarted_AddsJobTaskToUncompletedTasks()
{
Expand Down
49 changes: 46 additions & 3 deletions src/Horarium/Handlers/RunnerJobs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class RunnerJobs : IRunnerJobs
private readonly IExecutorJob _executorJob;
private Task _runnerTask;
private readonly IUncompletedTaskList _uncompletedTaskList;

private readonly TimeSpan _defaultJobThrottleInterval = TimeSpan.FromMilliseconds(100);

private CancellationToken _cancellationToken;
private readonly CancellationTokenSource _cancelTokenSource = new CancellationTokenSource();
Expand Down Expand Up @@ -100,11 +102,29 @@ private async Task<JobMetadata> GetReadyJob()

private async Task StartRunnerInternal(CancellationToken cancellationToken)
{
var jobWaitTime = _settings.IntervalStartJob;

while (true)
{
var isJobRan = await TryRunJob(cancellationToken, jobWaitTime);
if (!_settings.JobThrottleSettings.UseJobThrottle)
{
jobWaitTime = _settings.IntervalStartJob;
continue;
}

jobWaitTime = !isJobRan ? GetNextIntervalStartJob(jobWaitTime) : _settings.IntervalStartJob;
}
}

private async Task<bool> TryRunJob(CancellationToken cancellationToken, TimeSpan waitTime)
{
for (var i = 0; i < _settings.JobThrottleSettings.JobRetrievalAttempts; i++)
{
var job = await GetReadyJob();
var isJobReady = job != null;

if (job != null)
if (isJobReady)
{
_horariumLogger.Debug("Try to Run jobMetadata...");

Expand All @@ -117,11 +137,34 @@ private async Task StartRunnerInternal(CancellationToken cancellationToken)
throw new TaskCanceledException();
}

if (!_settings.IntervalStartJob.Equals(TimeSpan.Zero))
if (isJobReady)
{
await Task.Delay(_settings.IntervalStartJob, cancellationToken);
return true;
}

if (!waitTime.Equals(TimeSpan.Zero))
{
await Task.Delay(waitTime, cancellationToken);
}
}

return false;
}

private TimeSpan GetNextIntervalStartJob(TimeSpan currentInterval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please, create unit tests on this function with same cases

{
if (currentInterval.Equals(TimeSpan.Zero))
{
return _defaultJobThrottleInterval;
}

var nextInterval =
currentInterval +
TimeSpan.FromTicks((long) (currentInterval.Ticks * _settings.JobThrottleSettings.IntervalMultiplier));

var maxInterval = _settings.JobThrottleSettings.MaxJobThrottleInterval;

return nextInterval > maxInterval ? maxInterval : nextInterval;
}
}
}
2 changes: 2 additions & 0 deletions src/Horarium/HorariumSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public class HorariumSettings

public TimeSpan ObsoleteExecutingJob { get; set; } = TimeSpan.FromMinutes(5);

public JobThrottleSettings JobThrottleSettings { get; set; } = new JobThrottleSettings();

public IJobScopeFactory JobScopeFactory { get; set; } = new DefaultJobScopeFactory();

public IHorariumLogger Logger { get; set; } = new EmptyLogger();
Expand Down
28 changes: 28 additions & 0 deletions src/Horarium/JobThrottleSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System;

namespace Horarium
{
public class JobThrottleSettings
{
/// <summary>
/// When `true`, IntervalStartJob will automatically increase if there is no jobs available
/// </summary>
public bool UseJobThrottle { get; set; }

/// <summary>
/// After all attempts are exhausted, waiting interval is increased by formula:
/// <c>currentInterval + (currentInterval * intervalMultiplier)</c>
/// </summary>
public int JobRetrievalAttempts { get; set; } = 10;

/// <summary>
/// Multiplier to get the next waiting interval
/// </summary>
public double IntervalMultiplier { get; set; } = 0.25;

/// <summary>
/// Maximum waiting interval
/// </summary>
public TimeSpan MaxJobThrottleInterval { get; set; } = TimeSpan.FromSeconds(30);
}
}