Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Task.Run to better parallelize tasks #2

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions Rucksack.Tests/BasicIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public async Task BasicOneShotIntegrationTest()
await LoadTestRunner.Run(() =>
{
Interlocked.Increment(ref executionCount);
return ValueTask.CompletedTask;
return Task.CompletedTask;
}, new LoadTestOptions
{
LoadStrategy = new OneShotLoadStrategy(count),
Expand All @@ -44,7 +44,7 @@ public async Task BasicRepeatIntegrationTest()
await LoadTestRunner.Run(() =>
{
Interlocked.Increment(ref executionCount);
return ValueTask.CompletedTask;
return Task.CompletedTask;
}, new LoadTestOptions
{
LoadStrategy = new RepeatLoadStrategy(count, interval, duration),
Expand All @@ -57,6 +57,31 @@ await LoadTestRunner.Run(() =>
executionCount.Should().Be(count * durationSeconds / intervalSeconds);
}

[Fact]
public async Task BasicSteppedBurstIntegrationTest()
{
int executionCount = 0;
const int step = 10;
const int from = 10;
const int to = 50;
const int expected = 150; // 10 + 20 + 30 + 40 + 50

await LoadTestRunner.Run(() =>
{
Interlocked.Increment(ref executionCount);
return Task.CompletedTask;
}, new LoadTestOptions
{
LoadStrategy = new SteppedBurstLoadStrategy(step, from, to, FromSeconds(1)),
LoggerFactory = LoggerFactory.Create(builder =>
{
builder.AddXUnit(testOutputHelper);
}),
});

executionCount.Should().Be(expected);
}

[Fact]
public async Task RandomDelayRepeatIntegrationTest()
{
Expand Down
4 changes: 2 additions & 2 deletions Rucksack.Tests/Strategies/OneShotLoadStrategyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public async Task Step_WithCountOf1_CallsActionOnce()
var result = strategy.GenerateLoad(() =>
{
Interlocked.Increment(ref actionCalledCount);
return ValueTask.FromResult(new LoadTaskResult(TimeSpan.Zero));
return Task.FromResult(new LoadTaskResult(TimeSpan.Zero));
}, context);

await StrategyTestHelper.ExecuteStrategyResultAndWait(result);
Expand All @@ -39,7 +39,7 @@ public async Task Step_WithCountOf3_CallsActionThreeTimes()
var result = strategy.GenerateLoad(() =>
{
Interlocked.Increment(ref actionCalledCount);
return ValueTask.FromResult(new LoadTaskResult(TimeSpan.Zero));
return Task.FromResult(new LoadTaskResult(TimeSpan.Zero));
}, context);

await StrategyTestHelper.ExecuteStrategyResultAndWait(result);
Expand Down
12 changes: 6 additions & 6 deletions Rucksack.Tests/Strategies/RepeatLoadStrategyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ public async Task Step_WithCountOf1_CallsActionOnce()
// Arrange
var actionCalledCount = 0;
var strategy = new RepeatLoadStrategy(1, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
var action = () =>
LoadTask action = () =>
{
Interlocked.Increment(ref actionCalledCount);
return ValueTask.FromResult(new LoadTaskResult(TimeSpan.Zero));
return Task.FromResult(new LoadTaskResult(TimeSpan.Zero));
};
var context = new LoadStrategyContext(PreviousResult: null);

Expand All @@ -30,7 +30,7 @@ public async Task Step_WithCountOf1_CallsActionOnce()
result = strategy.GenerateLoad(action, new LoadStrategyContext(PreviousResult: result));

tasks.AddRange(await StrategyTestHelper.ExecuteStrategyResult(result));
await StrategyTestHelper.WhenAll(tasks);
await Task.WhenAll(tasks);

// Assert
actionCalledCount.Should().Be(1);
Expand All @@ -43,10 +43,10 @@ public async Task Step_WithCountOf3_CallsActionThreeTimes()
// Arrange
var actionCalledCount = 0;
var strategy = new RepeatLoadStrategy(3, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
var action = () =>
LoadTask action = () =>
{
Interlocked.Increment(ref actionCalledCount);
return ValueTask.FromResult(new LoadTaskResult(TimeSpan.Zero));
return Task.FromResult(new LoadTaskResult(TimeSpan.Zero));
};
var context = new LoadStrategyContext(PreviousResult: null);

Expand All @@ -62,7 +62,7 @@ public async Task Step_WithCountOf3_CallsActionThreeTimes()
result = strategy.GenerateLoad(action, new LoadStrategyContext(PreviousResult: result));

tasks.AddRange(await StrategyTestHelper.ExecuteStrategyResult(result));
await StrategyTestHelper.WhenAll(tasks);
await Task.WhenAll(tasks);

// Assert
actionCalledCount.Should().Be(3);
Expand Down
10 changes: 5 additions & 5 deletions Rucksack.Tests/Strategies/SteppedBurstLoadStrategyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ public async Task SteppedBurstLoadStrategy_FullTests(int step, int from, int to,
// Arrange
var actionCalledCount = 0;
var strategy = new SteppedBurstLoadStrategy(step, from, to, interval: TimeSpan.FromSeconds(1));
var action = () =>
LoadTask action = () =>
{
Interlocked.Increment(ref actionCalledCount);
return ValueTask.FromResult(new LoadTaskResult(TimeSpan.Zero));
return Task.FromResult(new LoadTaskResult(TimeSpan.Zero));
};

// Act
LoadStrategyResult? result = null;
List<ValueTask<LoadTaskResult>> tasks = [];
List<Task<LoadTaskResult>> tasks = [];

do
{
Expand All @@ -33,7 +33,7 @@ public async Task SteppedBurstLoadStrategy_FullTests(int step, int from, int to,

if (result.Tasks is { } resultTasks)
{
tasks.AddRange(resultTasks);
tasks.AddRange(resultTasks.Select(i => Task.Run(() => i())));
}

if (result.RepeatDelay.HasValue)
Expand All @@ -43,7 +43,7 @@ public async Task SteppedBurstLoadStrategy_FullTests(int step, int from, int to,
}
while (result.RepeatDelay.HasValue);

await StrategyTestHelper.WhenAll(tasks);
await Task.WhenAll(tasks);

// Assert
result.RepeatDelay.Should().BeNull();
Expand Down
16 changes: 5 additions & 11 deletions Rucksack.Tests/Strategies/StrategyTestHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,22 @@ namespace Rucksack.Tests.Strategies;

public static class StrategyTestHelper
{
public static async ValueTask ExecuteStrategyResultAndWait(LoadStrategyResult result)
public static async Task ExecuteStrategyResultAndWait(LoadStrategyResult result)
{
var tasks = await ExecuteStrategyResult(result);

await WhenAll(tasks);
await Task.WhenAll(tasks);
}

public static async ValueTask WhenAll(IEnumerable<ValueTask<LoadTaskResult>> tasks)
public static async Task<List<Task<LoadTaskResult>>> ExecuteStrategyResult(LoadStrategyResult result)
{
foreach (var task in tasks)
{
await task;
}
}
var tasks = result.Tasks?.Select(i => Task.Run(() => i())).ToList() ?? [];

public static async ValueTask<List<ValueTask<LoadTaskResult>>> ExecuteStrategyResult(LoadStrategyResult result)
{
if (result.RepeatDelay.HasValue)
{
await Task.Delay(result.RepeatDelay.Value);
}

return [..result.Tasks ?? []];
return tasks;
}
}
2 changes: 1 addition & 1 deletion Rucksack/ILoadStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ namespace Rucksack;

public interface ILoadStrategy
{
LoadStrategyResult GenerateLoad(Func<ValueTask<LoadTaskResult>> action, LoadStrategyContext context);
LoadStrategyResult GenerateLoad(LoadTask action, LoadStrategyContext context);
}
6 changes: 2 additions & 4 deletions Rucksack/LoadStrategies/OneShotLoadStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@ namespace Rucksack.LoadStrategies;

public class OneShotLoadStrategy(int count) : ILoadStrategy
{
public LoadStrategyResult GenerateLoad(Func<ValueTask<LoadTaskResult>> action, LoadStrategyContext context)
public LoadStrategyResult GenerateLoad(LoadTask action, LoadStrategyContext context)
{
if (context.PreviousResult is not null)
{
throw new InvalidOperationException("OneShotLoadStrategy does not support previous results.");
}

var tasks = Enumerable.Range(0, count)
.Select(_ => action())
.ToArray();
var tasks = Enumerable.Repeat(action, count).ToArray();

return new LoadStrategyResult(RepeatDelay: null, Tasks: tasks);
}
Expand Down
8 changes: 3 additions & 5 deletions Rucksack/LoadStrategies/RepeatLoadStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace Rucksack.LoadStrategies;
public class RepeatLoadStrategy(int countPerInterval, TimeSpan interval, TimeSpan totalDuration)
: ILoadStrategy
{
public LoadStrategyResult GenerateLoad(Func<ValueTask<LoadTaskResult>> action, LoadStrategyContext context)
public LoadStrategyResult GenerateLoad(LoadTask action, LoadStrategyContext context)
{
RepeatLoadStrategyResult result;
int iteration = 1;
Expand All @@ -29,9 +29,7 @@ public LoadStrategyResult GenerateLoad(Func<ValueTask<LoadTaskResult>> action, L
return LoadStrategyResult.Finished;
}

var tasks = Enumerable.Range(0, countPerInterval)
.Select(_ => action())
.ToArray();
var tasks = Enumerable.Repeat(action, countPerInterval).ToArray();

return result with
{
Expand All @@ -40,6 +38,6 @@ public LoadStrategyResult GenerateLoad(Func<ValueTask<LoadTaskResult>> action, L
};
}

private record RepeatLoadStrategyResult(TimeSpan? RepeatDelay, Stopwatch Stopwatch, int Iteration, IReadOnlyList<ValueTask<LoadTaskResult>>? Tasks)
private record RepeatLoadStrategyResult(TimeSpan? RepeatDelay, Stopwatch Stopwatch, int Iteration, IReadOnlyList<LoadTask>? Tasks)
: LoadStrategyResult(RepeatDelay, Tasks);
}
8 changes: 3 additions & 5 deletions Rucksack/LoadStrategies/SteppedBurstLoadStrategy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public SteppedBurstLoadStrategy(int step, int from, int to, TimeSpan interval)
_interval = interval;
}

public LoadStrategyResult GenerateLoad(Func<ValueTask<LoadTaskResult>> action, LoadStrategyContext context)
public LoadStrategyResult GenerateLoad(LoadTask action, LoadStrategyContext context)
{
SteppedLoadStrategyResult result;
int currentCount = _from;
Expand All @@ -52,9 +52,7 @@ public LoadStrategyResult GenerateLoad(Func<ValueTask<LoadTaskResult>> action, L
currentCount = result.CurrentCount + _step;
}

var tasks = Enumerable.Range(0, currentCount)
.Select(_ => action())
.ToArray();
var tasks = Enumerable.Repeat(action, currentCount).ToArray();

return result with
{
Expand All @@ -68,6 +66,6 @@ internal static bool IsFinished(int from, int to, int currentCount) =>
(from < to && currentCount >= to)
|| (from > to && currentCount <= to);

private record SteppedLoadStrategyResult(TimeSpan? RepeatDelay, int CurrentCount, IReadOnlyList<ValueTask<LoadTaskResult>>? Tasks)
private record SteppedLoadStrategyResult(TimeSpan? RepeatDelay, int CurrentCount, IReadOnlyList<LoadTask>? Tasks)
: LoadStrategyResult(RepeatDelay, Tasks);
}
2 changes: 1 addition & 1 deletion Rucksack/LoadStrategyResult.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace Rucksack;

public record LoadStrategyResult(TimeSpan? RepeatDelay, IReadOnlyList<ValueTask<LoadTaskResult>>? Tasks)
public record LoadStrategyResult(TimeSpan? RepeatDelay, IReadOnlyList<LoadTask>? Tasks)
{
public static LoadStrategyResult Finished { get; } = new(null, null);
}
3 changes: 3 additions & 0 deletions Rucksack/LoadTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
namespace Rucksack;

public delegate Task<LoadTaskResult> LoadTask();
10 changes: 5 additions & 5 deletions Rucksack/LoadTestRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ public static void Run(Action action, LoadTestOptions options) =>
Run(() =>
{
action();
return ValueTask.CompletedTask;
return Task.CompletedTask;
}, options).Wait();

public static async Task Run(Func<ValueTask> action, LoadTestOptions options)
public static async Task Run(Func<Task> action, LoadTestOptions options)
{
var loggerFactory = options.LoggerFactory ?? LoggerFactory.Create(builder =>
{
Expand All @@ -24,7 +24,7 @@ public static async Task Run(Func<ValueTask> action, LoadTestOptions options)
logger.LogInformation("Rucksack is running...");

LoadStrategyResult? result = null;
List<ValueTask<LoadTaskResult>> allTasks = [];
List<Task<LoadTaskResult>> allTasks = [];

while (true)
{
Expand All @@ -36,7 +36,7 @@ public static async Task Run(Func<ValueTask> action, LoadTestOptions options)
{
logger.LogDebug("Enqueueing {Count} new tasks", tasks.Count);

allTasks.AddRange(tasks);
allTasks.AddRange(tasks.Select(task => Task.Run(() => task())));
}

if (result.RepeatDelay == null)
Expand Down Expand Up @@ -87,7 +87,7 @@ public static async Task Run(Func<ValueTask> action, LoadTestOptions options)

return;

async ValueTask<LoadTaskResult> LoadAction()
async Task<LoadTaskResult> LoadAction()
{
var stopwatch = Stopwatch.StartNew();

Expand Down