Skip to content

Commit

Permalink
Merge pull request #26 from dbones-labs/invert-filter
Browse files Browse the repository at this point in the history
Invert filter
  • Loading branch information
dbones authored Oct 27, 2024
2 parents db3b8e5 + 694a492 commit d0092d0
Show file tree
Hide file tree
Showing 18 changed files with 242 additions and 10 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# laters

[![release](https://img.shields.io/github/v/release/dbones-labs/laters?logo=nuget)](https://github.com/dbones-labs/laters/releases) [![Nuget](https://img.shields.io/badge/nuget-laters-blue)](https://github.com/orgs/dbones-labs/packages?repo_name=laters)
[![release](https://img.shields.io/github/v/release/dbones-labs/laters?logo=nuget)](https://github.com/dbones-labs/laters/releases) [![Nuget](https://img.shields.io/badge/nuget-laters-blue)](https://www.nuget.org/packages?q=laters&includeComputedFrameworks=true&prerel=true&sortby=relevance) [![Github Nuget](https://img.shields.io/badge/github--nuget-laters-blue)](https://github.com/orgs/dbones-labs/packages?repo_name=laters)
[![docs](https://img.shields.io/badge/docs-laters-blue)](https://dbones-labs.github.io/laters/)


Expand Down
7 changes: 7 additions & 0 deletions docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ export default defineConfig({
{ text: 'EntityFramework', link: '/storage/entity-framework' },
{ text: 'custom', link: '/storage/custom' }
]
},
{
text: 'Performance',
items: [
{ text: 'Overview', link: '/performance/performance' },
{ text: 'rate-limiting', link: '/performance/rate-limiting' }
]
}
],

Expand Down
Binary file added docs/performance/gobal-throttle.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/performance/multiple-windows.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/performance/named-window-throttle.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
75 changes: 75 additions & 0 deletions docs/performance/performance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
---
outline: deep
---

# Performance

> [!NOTE]
> The architecture makes use of your loadbalancer, however here are a few more controls avalible to you
While dealing with scheduled tasks, you may need to apply controls to

- increase throughput
- control how many jobs are being processed in a timeframe

while these seem to be in contradiction (maximising and limiting) both are under the hood of performance

## Parallel Processing

> [!NOTE]
> please note the defaults are: `InMemoryWorkerQueueMax` 45 and `NumberOfProcessingThreads` 10
While the leader loads queued jobs to be processed, you can control

- The number of Jobs to batch
- The number of Jobs to process in parallel

Configure both of these togeher in order to tune the performance to your needs.

howe this works is

the batch, will load into memory all the jobs to process (only the IDs)

![theading](./threading.png)

The `In Memory Queue`, reduces the commincation with the database.

The `Thread Pool`, increased the number of concurrency in processed jobs (taking advantage of the threading in .NET).


## Rate limiting

> [!NOTE]
> `gobal` is set to `1_000_000` in a `1` second window , it is **recommened** to override this.
This is a mechanism where we use Tumberling winows to limit the number of Jobs that can be processed within a given time frame.

An example of this is when you application has extreme high through put which adds jobs to be processed, which can wait a while. This will allow you to throttle the processing and minimise the amount of hardware you may require (without a rate-limiter you could cause massive drain on your hardware or on downstream services)

![window](./window.png)

The leader will hold details of how many Jobs have been processed and in which windows.

### Types of window

there are 2 types of window

- `global` - set this to set the TOTAL number of message that can be processed in a given window
- `named windows` - you can provide named tumbling windows to provide sub limits for certain jobs.

If a job is queued without a named-window, then only the global window is applied.

![two-windows](./multiple-windows.png)

In this case you can see all jobs are rate-limited

- via global fist
- then 3 are processed by the green-window and 3 by the blue-window

![global-limiting](./gobal-throttle.png)

If we process more items in the time frame, for the global, any items will wait till the window count goes back down.

![named-window-throttling](./named-window-throttle.png)

Limiting is finally applied at a named window, for Jobs which are associated with that window, in this case the 4th blue item will be processed when the window clears down.
91 changes: 91 additions & 0 deletions docs/performance/rate-limiting.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
---
outline: deep
---

# Rate Limiting

This is a powerful mechanism which throttles jobs, for more information please read the [Performace](./performance.md) for more information on how this works

In this page we show how we can setup windows and then queue jobs against a window

## Config

The windows are setup in the config (`appsettings.json`)

Add the windows, here we add `green` and `blue`

```json
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"Laters" : {
"Windows": {
"green": {
"Max": 500,
"SizeInSeconds": 360
},
"blue": {
"Max": 200,
"SizeInSeconds": 60
}
}
}
}
```

### override global

> [!NOTE]
> All jobs will be rate limited through the global window, so remember to set this high enough for your total throughput.
`global` is always present, with defaults, however you can override it

```json
{
//other config
"Laters" : {
"Windows": {
"gloabl": {
"Max": 20000,
"SizeInSeconds": 5
}
}
}
}
```

## Code

Each job that is queued will be processed in a window, and you can supply the name using the options.

### Lookup

Setup constaints for each window name

Although not required this will help you with magic strings

```csharp
public static class Windows {
public const string Blue = "blue";
public const string Gree = "green";
}
```

### Enqueue into a window

To queue the job and throttle with a window, you will need to provide it via the `options` and say which `WindowName`

Here is how we could setup a job to be rate-limited in the `blue` window:

```csharp
var options = new OnceOptions();
options.Delivery.WindowName = Windows.Blue;

var sayHello = new SayHello { Name = "bob" };

schedule.ForLater(sayHello, options);
```
Binary file added docs/performance/threading.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/performance/window.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion src/Laters.Data.EntityFrameworkCore/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task<List<Candidate>> GetJobsToProcess(List<string> rateLimitNames,
var items = await _queryDbContext.Jobs
.Where(x => x.ScheduledFor <= SystemDateTime.UtcNow)
.Where(x => !x.DeadLettered)
.Where(x => rateLimitNames.Contains(x.WindowName))
.Where(x => !rateLimitNames.Contains(x.WindowName))
.OrderByDescending(x => x.ScheduledFor)
.Skip(skip)
.Take(take)
Expand Down
2 changes: 1 addition & 1 deletion src/Laters.Data.Marten/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public async Task<List<Candidate>> GetJobsToProcess(List<string> rateLimitNames,
.Query<Job>()
.Where(x => x.ScheduledFor <= SystemDateTime.UtcNow)
.Where(x => !x.DeadLettered)
.Where(x => rateLimitNames.Contains(x.WindowName))
.Where(x => !rateLimitNames.Contains(x.WindowName))
.OrderByDescending(x => x.ScheduledFor)
.Skip(skip)
.Take(take)
Expand Down
2 changes: 1 addition & 1 deletion src/Laters.Tests/Infrastructure/DefaultTestServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public async Task Setup()
var pgHost = Environment.GetEnvironmentVariable("PG_HOST");
if (string.IsNullOrWhiteSpace(pgHost))
{
pgHost = "postgres";
pgHost = "localhost";
}
Console.WriteLine($"PG_HOST: {pgHost}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace Laters.Tests.ServerSide.Windows;
using Machine.Specifications;
using PowerAssert;
using ServerProcessing.Windows;
using Marten;

[Subject("window")]
[Tags("class-test")]
Expand Down Expand Up @@ -47,11 +48,11 @@ class When_tumbler_has_entries_are_outside_of_the_global_window
Because of = async () =>
{
SystemDateTime.Set(() => _observedSlice);
await Rig.Wait(() => _sut.AreWeOkToProcessThisWindow("global"));
await Rig.Wait(() => _sut!.AreWeOkToProcessThisWindow("global"));
};

It should_remove_obsolete_entries = () =>
PAssert.IsTrue(() => _sut.GetWindowsWhichAreWithinLimits().Contains("global"));
PAssert.IsTrue(() => _sut!.GetWindowsWhichHaveReachedTheirLimits().IsEmpty());

Cleanup after = () =>
{
Expand Down
2 changes: 1 addition & 1 deletion src/Laters/Data/InMemory/InMemorySession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Task<List<Candidate>> GetJobsToProcess(List<string> rateLimitNames, int s
var items = GetEntities<Job>()
.Where(x => x.ScheduledFor <= SystemDateTime.UtcNow)
.Where(x => !x.DeadLettered)
.Where(x => rateLimitNames.Contains(x.WindowName))
.Where(x => !rateLimitNames.Contains(x.WindowName))
.OrderByDescending(x => x.ScheduledFor)
.Skip(skip)
.Take(take)
Expand Down
27 changes: 26 additions & 1 deletion src/Laters/ServerProcessing/Engine/JobWorkerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
using Infrastructure;
using Infrastructure.Telemetry;


/// <summary>
/// the main part of the leader, which is responsible for processing the jobs.
/// </summary>
public class JobWorkerQueue : IDisposable
{
//injected
Expand All @@ -24,6 +28,16 @@ public class JobWorkerQueue : IDisposable
CandidatePopulateTrigger _populateTrigger;
ContinuousLambda _populateLambda;

/// <summary>
/// creates an instance of the JobWorkerQueue
/// </summary>
/// <param name="leaderContext"></param>
/// <param name="tumbler"></param>
/// <param name="serviceProvider"></param>
/// <param name="configuration"></param>
/// <param name="workerClient"></param>
/// <param name="telemetry"></param>
/// <param name="logger"></param>
public JobWorkerQueue(
LeaderContext leaderContext,
ITumbler tumbler,
Expand All @@ -47,6 +61,9 @@ public JobWorkerQueue(
new ContinuousLambda(nameof(PopulateCandidates), async () => await PopulateCandidates(), _populateTrigger);
}

/// <summary>
/// the the JobWorkerQueue component
/// </summary>
public void Initialize(CancellationToken cancellationToken)
{
_logger.LogInformation("Initialize the JobWorkerQueue component");
Expand All @@ -67,7 +84,15 @@ protected virtual async Task PopulateCandidates(CancellationToken cancellationTo
using (var workingScope = _serviceProvider.CreateScope())
{
var querySession = workingScope.ServiceProvider.GetRequiredService<ISession>();
var windowNames = _tumbler.GetWindowsWhichAreWithinLimits();
var windowNames = _tumbler.GetWindowsWhichHaveReachedTheirLimits();

//the glabal means we need to skip all windows.
if (windowNames.Count == 1 && windowNames[0] == LatersConstants.GlobalTumbler)
{
_logger.LogInformation("the global window has reached its limit");
_populateTrigger.SetWhenToFetch(FetchStrategy.Wait);
return;
}

var take = _configuration.InMemoryWorkerQueueMax; //this is the in memory queue. (batch)
candidates = await querySession.GetJobsToProcess(windowNames, 0, take);
Expand Down
22 changes: 21 additions & 1 deletion src/Laters/ServerProcessing/Windows/DefaultTumbler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public DefaultTumbler(LatersConfiguration configuration)
}
}

/// <inheritdoc />
public void Initialize(CancellationToken cancellationToken)
{
_globalWindow.Initialize(cancellationToken);
Expand All @@ -45,12 +46,14 @@ public void Initialize(CancellationToken cancellationToken)
}
}

/// <inheritdoc />
public bool AreWeOkToProcessThisWindow(string windowName)
{
if (_globalWindow.ReachedMax) return false;
return !_namedWindows.TryGetValue(windowName, out var window) || window.AvailableCapacity;
}

[Obsolete("This method is not used, and will be removed in the future.", true)]
public List<string> GetWindowsWhichAreWithinLimits()
{
var names = new List<string>();
Expand All @@ -69,7 +72,24 @@ public List<string> GetWindowsWhichAreWithinLimits()

return names;
}

/// <inheritdoc />
public List<string> GetWindowsWhichHaveReachedTheirLimits()
{
if (_globalWindow.ReachedMax)
{
return new List<string> {LatersConstants.GlobalTumbler};
}

var names = new List<string>();

var availableWindows = _namedWindows.Where(x => x.Value.ReachedMax).Select(x => x.Key);
names.AddRange(availableWindows);

return names;
}

/// <inheritdoc />
public void RecordJobQueue(string rateName)
{
var dateTime = SystemDateTime.UtcNow;
Expand All @@ -79,7 +99,7 @@ public void RecordJobQueue(string rateName)
window.AddItemsToWindow(dateTime, 1);
}
}

Task UpdateTrigger()
{
var shouldRun = _globalWindow.AvailableCapacity;
Expand Down
8 changes: 7 additions & 1 deletion src/Laters/ServerProcessing/Windows/ITumbler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ public interface ITumbler
/// find all windows which have available capacity
/// </summary>
/// <returns>all window names which can process</returns>
[Obsolete("This method is not used, and will be removed in the future.", true)]
List<string> GetWindowsWhichAreWithinLimits();


/// <summary>
/// get the windows which have reached their limits
/// </summary>
/// <returns>either `global` or the list of windows which have reached there limits</returns>
public List<string> GetWindowsWhichHaveReachedTheirLimits();

/// <summary>
/// record a job queue for a given window
Expand Down
Loading

0 comments on commit d0092d0

Please sign in to comment.