Skip to content

Commit

Permalink
Reuse thread state when prewarming addresses in parallel (#7984)
Browse files Browse the repository at this point in the history
  • Loading branch information
benaadams authored Dec 31, 2024
1 parent c924576 commit 4923d0c
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,18 @@ private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spe
{
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)
{
ParallelUnbalancedWork.For(0, block.Withdrawals.Length, parallelOptions, (preWarmer: this, block, stateRoot),
ParallelUnbalancedWork.For(0, block.Withdrawals.Length, parallelOptions, (envPool: _envPool, block, stateRoot),
static (i, state) =>
{
IReadOnlyTxProcessorSource env = state.preWarmer._envPool.Get();
IReadOnlyTxProcessorSource env = state.envPool.Get();
try
{
using IReadOnlyTxProcessingScope scope = env.Build(state.stateRoot);
scope.WorldState.WarmUp(state.block.Withdrawals[i].Address);
}
finally
{
state.preWarmer._envPool.Return(env);
state.envPool.Return(env);
}

return state;
Expand Down Expand Up @@ -242,11 +242,12 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;

ObjectPool<IReadOnlyTxProcessorSource> envPool = PreWarmer._envPool;
try
{
if (SystemTxAccessLists is not null)
{
var env = PreWarmer._envPool.Get();
var env = envPool.Get();
try
{
using IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
Expand All @@ -258,38 +259,36 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
}
finally
{
PreWarmer._envPool.Return(env);
envPool.Return(env);
SystemTxAccessLists.Dispose();
}
}

ParallelUnbalancedWork.For(0, block.Transactions.Length, parallelOptions, (preWarmer: PreWarmer, block, StateRoot),
AddressWarmingState baseState = new(envPool, block, StateRoot);

ParallelUnbalancedWork.For(
0,
block.Transactions.Length,
parallelOptions,
baseState.InitThreadState,
static (i, state) =>
{
Transaction tx = state.block.Transactions[i];
Transaction tx = state.Block.Transactions[i];
Address? sender = tx.SenderAddress;

var env = state.preWarmer._envPool.Get();
try
if (sender is not null)
{
using IReadOnlyTxProcessingScope scope = env.Build(state.StateRoot);
if (sender is not null)
{
scope.WorldState.WarmUp(sender);
}
Address to = tx.To;
if (to is not null)
{
scope.WorldState.WarmUp(to);
}
state.Scope.WorldState.WarmUp(sender);
}
finally
Address to = tx.To;
if (to is not null)
{
state.preWarmer._envPool.Return(env);
state.Scope.WorldState.WarmUp(to);
}

return state;
});
},
AddressWarmingState.FinallyAction);
}
catch (OperationCanceledException)
{
Expand All @@ -298,6 +297,37 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
}
}

private readonly struct AddressWarmingState(ObjectPool<IReadOnlyTxProcessorSource> envPool, Block block, Hash256 stateRoot) : IDisposable
{
public static Action<AddressWarmingState> FinallyAction { get; } = DisposeThreadState;

public readonly ObjectPool<IReadOnlyTxProcessorSource> EnvPool = envPool;
public readonly Block Block = block;
public readonly Hash256 StateRoot = stateRoot;
public readonly IReadOnlyTxProcessorSource? Env;
public readonly IReadOnlyTxProcessingScope? Scope;

public AddressWarmingState(ObjectPool<IReadOnlyTxProcessorSource> envPool, Block block, Hash256 stateRoot, IReadOnlyTxProcessorSource env, IReadOnlyTxProcessingScope scope) : this(envPool, block, stateRoot)
{
Env = env;
Scope = scope;
}

public AddressWarmingState InitThreadState()
{
IReadOnlyTxProcessorSource env = EnvPool.Get();
return new(EnvPool, Block, StateRoot, env, scope: env.Build(StateRoot));
}

public void Dispose()
{
Scope.Dispose();
EnvPool.Return(Env);
}

private static void DisposeThreadState(AddressWarmingState state) => state.Dispose();
}

private class ReadOnlyTxProcessingEnvPooledObjectPolicy(ReadOnlyTxProcessingEnvFactory envFactory) : IPooledObjectPolicy<IReadOnlyTxProcessorSource>
{
public IReadOnlyTxProcessorSource Create() => envFactory.Create();
Expand Down
67 changes: 42 additions & 25 deletions src/Nethermind/Nethermind.Core/Threading/ParallelUnbalancedWork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public static void For(int fromInclusive, int toExclusive, ParallelOptions paral
{
int threads = parallelOptions.MaxDegreeOfParallelism > 0 ? parallelOptions.MaxDegreeOfParallelism : Environment.ProcessorCount;

Data data = new(threads, fromInclusive, toExclusive, action);
Data data = new(threads, fromInclusive, toExclusive, action, parallelOptions.CancellationToken);

for (int i = 0; i < threads - 1; i++)
{
Expand All @@ -55,6 +55,8 @@ public static void For(int fromInclusive, int toExclusive, ParallelOptions paral
{
data.Event.Wait();
}

parallelOptions.CancellationToken.ThrowIfCancellationRequested();
}

/// <summary>
Expand Down Expand Up @@ -137,16 +139,22 @@ private ParallelUnbalancedWork(Data data)
/// </summary>
public void Execute()
{
int i = _data.Index.GetNext();
while (i < _data.ToExclusive)
try
{
int i = _data.Index.GetNext();
while (i < _data.ToExclusive)
{
if (_data.CancellationToken.IsCancellationRequested) return;
_data.Action(i);
// Get the next index
i = _data.Index.GetNext();
}
}
finally
{
_data.Action(i);
// Get the next index
i = _data.Index.GetNext();
// Signal that this thread has completed its work
_data.MarkThreadCompleted();
}

// Signal that this thread has completed its work
_data.MarkThreadCompleted();
}

/// <summary>
Expand All @@ -173,14 +181,15 @@ private struct PaddedValue(int value)
/// <summary>
/// Represents the base data shared among threads during parallel execution.
/// </summary>
private class BaseData(int threads, int fromInclusive, int toExclusive)
private class BaseData(int threads, int fromInclusive, int toExclusive, CancellationToken token)
{
/// <summary>
/// Gets the shared counter for indices.
/// </summary>
public SharedCounter Index { get; } = new SharedCounter(fromInclusive);
public SemaphoreSlim Event { get; } = new(initialCount: 0);
private int _activeThreads = threads;
public CancellationToken CancellationToken { get; } = token;

/// <summary>
/// Gets the exclusive upper bound of the range.
Expand Down Expand Up @@ -212,8 +221,8 @@ public int MarkThreadCompleted()
/// <summary>
/// Represents the data shared among threads for the parallel action.
/// </summary>
private class Data(int threads, int fromInclusive, int toExclusive, Action<int> action) :
BaseData(threads, fromInclusive, toExclusive)
private class Data(int threads, int fromInclusive, int toExclusive, Action<int> action, CancellationToken token) :
BaseData(threads, fromInclusive, toExclusive, token)
{
/// <summary>
/// Gets the action to be executed for each iteration.
Expand Down Expand Up @@ -254,7 +263,7 @@ public static void For(
: Environment.ProcessorCount;

// Create shared data with thread-local initializers and finalizers
var data = new Data<TLocal>(threads, fromInclusive, toExclusive, action, init, initValue, @finally);
var data = new Data<TLocal>(threads, fromInclusive, toExclusive, action, init, initValue, @finally, parallelOptions.CancellationToken);

// Queue work items to the thread pool for all threads except the current one
for (int i = 0; i < threads - 1; i++)
Expand All @@ -270,6 +279,8 @@ public static void For(
{
data.Event.Wait();
}

parallelOptions.CancellationToken.ThrowIfCancellationRequested();
}

/// <summary>
Expand All @@ -284,16 +295,21 @@ public static void For(
public void Execute()
{
TLocal? value = _data.Init();
int i = _data.Index.GetNext();
while (i < _data.ToExclusive)
try
{
value = _data.Action(i, value);
i = _data.Index.GetNext();
int i = _data.Index.GetNext();
while (i < _data.ToExclusive)
{
if (_data.CancellationToken.IsCancellationRequested) return;
value = _data.Action(i, value);
i = _data.Index.GetNext();
}
}
finally
{
_data.Finally(value);
_data.MarkThreadCompleted();
}

_data.Finally(value);

_data.MarkThreadCompleted();
}

/// <summary>
Expand All @@ -304,9 +320,10 @@ private class Data<TValue>(int threads,
int fromInclusive,
int toExclusive,
Func<int, TLocal, TLocal> action,
Func<TValue>? init = null,
TValue? initValue = default,
Action<TValue>? @finally = null) : BaseData(threads, fromInclusive, toExclusive)
Func<TValue>? init,
TValue? initValue,
Action<TValue>? @finally,
CancellationToken token) : BaseData(threads, fromInclusive, toExclusive, token)
{
/// <summary>
/// Gets the action to be executed for each iteration.
Expand All @@ -317,7 +334,7 @@ private class Data<TValue>(int threads,
/// Initializes the thread-local data.
/// </summary>
/// <returns>The initialized thread-local data.</returns>
public TValue Init() => initValue ?? (init is not null ? init.Invoke() : default)!;
public TValue Init() => init is not null ? init.Invoke() : initValue!;

/// <summary>
/// Finalizes the thread-local data.
Expand Down

0 comments on commit 4923d0c

Please sign in to comment.