Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/recover whole path #8120

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Nethermind.State;
using Nethermind.Core.Eip2930;
using Nethermind.Core.Collections;
using Nethermind.Trie;

namespace Nethermind.Consensus.Processing;

Expand Down Expand Up @@ -106,6 +107,9 @@ private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spe
using IReadOnlyTxProcessingScope scope = env.Build(state.stateRoot);
scope.WorldState.WarmUp(state.block.Withdrawals[i].Address);
}
catch (MissingTrieNodeException)
{
}
finally
{
state.envPool.Return(env);
Expand Down Expand Up @@ -276,14 +280,21 @@ private void WarmupAddresses(ParallelOptions parallelOptions, Block block)
Transaction tx = state.Block.Transactions[i];
Address? sender = tx.SenderAddress;

if (sender is not null)
try
{
state.Scope.WorldState.WarmUp(sender);
if (sender is not null)
{
state.Scope.WorldState.WarmUp(sender);
}

Address to = tx.To;
if (to is not null)
{
state.Scope.WorldState.WarmUp(to);
}
}
Address to = tx.To;
if (to is not null)
catch (MissingTrieNodeException)
{
state.Scope.WorldState.WarmUp(to);
}

return state;
Expand Down
39 changes: 39 additions & 0 deletions src/Nethermind/Nethermind.Core.Test/AutoCancelTokenSourceTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Threading;
using FluentAssertions;
using Nethermind.Core.Extensions;
using Nethermind.Core.Utils;
using NUnit.Framework;

namespace Nethermind.Core.Test;

public class AutoCancelTokenSourceTests
{
[Test]
public void AutoCancelOnExitClosure()
{
CancellationToken TaskWithInnerCancellation(CancellationToken token)
{
using AutoCancelTokenSource cts = token.CreateChildTokenSource();
return cts.Token;
}

TaskWithInnerCancellation(default).IsCancellationRequested.Should().BeTrue();
}

[Test]
public void AutoCancelPropagateParentCancellation()
{
using CancellationTokenSource cts = new CancellationTokenSource();

using AutoCancelTokenSource acts = cts.Token.CreateChildTokenSource();

acts.Token.IsCancellationRequested.Should().BeFalse();

cts.Cancel();

acts.Token.IsCancellationRequested.Should().BeTrue();
}
}
8 changes: 6 additions & 2 deletions src/Nethermind/Nethermind.Core.Test/Modules/NetworkModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,12 @@ protected override void Load(ContainerBuilder builder)
{
ILogManager logManager = ctx.Resolve<ILogManager>();
ctx.Resolve<IWorldStateManager>().InitializeNetwork(
new GetNodeDataTrieNodeRecovery(peerPool, logManager),
new SnapTrieNodeRecovery(peerPool, logManager));
new PathNodeRecovery(
new NodeDataRecovery(peerPool!, ctx.Resolve<INodeStorage>(), logManager),
new SnapRangeRecovery(peerPool!, logManager),
logManager
)
);
})

// TODO: LastNStateRootTracker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core.Utils;

namespace Nethermind.Core.Extensions
{
Expand Down Expand Up @@ -54,5 +55,13 @@ public static CancellationTokenSource ThatCancelAfter(this CancellationTokenSour
cts.CancelAfter(delay);
return cts;
}

public static AutoCancelTokenSource CreateChildTokenSource(this CancellationToken parentToken, TimeSpan delay = default)
{
CancellationTokenSource cts = CancellationTokenSource.CreateLinkedTokenSource(parentToken);
if (delay != TimeSpan.Zero) cts.CancelAfter(delay);

return new AutoCancelTokenSource(cts);
}
}
}
48 changes: 48 additions & 0 deletions src/Nethermind/Nethermind.Core/Tasks/WaitForPassingTasks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;

namespace Nethermind.Core.Tasks;

public static class Wait
{
/// <summary>
/// Wait for any of the task that passed the predicate and forward the result, or all of the task to complete.
/// </summary>
/// <param name="cond"></param>
/// <param name="tasks"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static async Task<T> ForPassingTask<T>(Func<T, bool> cond, params IEnumerable<Task<T>> tasks)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it span?
LINQ style naming? AnyWhere or just Any - in LINQ it has optional parameter:https://learn.microsoft.com/en-us/dotnet/api/system.linq.enumerable.any?view=net-8.0#system-linq-enumerable-any-1(system-collections-generic-ienumerable((-0))-system-func((-0-system-boolean)))

Suggested change
public static async Task<T> ForPassingTask<T>(Func<T, bool> cond, params IEnumerable<Task<T>> tasks)
public static async Task<T> AnyWhere<T>(Func<T, bool> predicate, params ReadOnlySpan<Task<T>> tasks)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because its an async code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok the naming comment still stands

{
HashSet<Task<T>> taskSet = new HashSet<Task<T>>(tasks);
while (taskSet.Any())
{
Task<T> resolved = await Task.WhenAny<T>(taskSet);
taskSet.Remove(resolved);

T result = await resolved;

if (cond(result))
{
// Its ok, then immediately return.
return result;
}

if (!taskSet.Any())
{
// No more tasks, just return the last one.
return result;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we return null or just TaskCompleated with default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... don't know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it does make nullability simple.

}

// Otherwise, we try WhenAny again.
}

throw new UnreachableException();
}
}
29 changes: 29 additions & 0 deletions src/Nethermind/Nethermind.Core/Utils/AutoCancelTokenSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// SPDX-FileCopyrightText: 2025 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading;

namespace Nethermind.Core.Utils;

/// <summary>
/// Automatically cancel and dispose underlying cancellation token source.
/// Make it easy to have golang style defer cancel pattern.
/// </summary>
public readonly struct AutoCancelTokenSource(CancellationTokenSource cancellationTokenSource) : IDisposable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not derive from CancellationTokenSource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to use inheritance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somewhat agree, but it IS a CancellationTokenSource, so it makes some sense

{
public CancellationToken Token => cancellationTokenSource.Token;

public static AutoCancelTokenSource ThatCancelAfter(TimeSpan delay)
{
CancellationTokenSource cancellationTokenSource = new();
cancellationTokenSource.CancelAfter(delay);
return new AutoCancelTokenSource(cancellationTokenSource);
}

public void Dispose()
{
cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
}
}
16 changes: 5 additions & 11 deletions src/Nethermind/Nethermind.Init/PruningTrieStateFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,11 @@ ILogManager logManager

INodeStorage mainNodeStorage = nodeStorageFactory.WrapKeyValueStore(stateDb);

TrieStore trieStore = syncConfig.TrieHealing
? new HealingTrieStore(
mainNodeStorage,
pruningStrategy,
persistenceStrategy,
logManager)
: new TrieStore(
mainNodeStorage,
pruningStrategy,
persistenceStrategy,
logManager);
TrieStore trieStore = new TrieStore(
mainNodeStorage,
pruningStrategy,
persistenceStrategy,
logManager);

ITrieStore mainWorldTrieStore = trieStore;
PreBlockCaches? preBlockCaches = null;
Expand Down
8 changes: 7 additions & 1 deletion src/Nethermind/Nethermind.Init/Steps/InitializeNetwork.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,13 @@ private async Task Initialize(CancellationToken cancellationToken)
_api.DisposeStack.Push((IAsyncDisposable)container);
}

_api.WorldStateManager!.InitializeNetwork(new GetNodeDataTrieNodeRecovery(_api.SyncPeerPool!, _api.LogManager), new SnapTrieNodeRecovery(_api.SyncPeerPool!, _api.LogManager));
_api.WorldStateManager!.InitializeNetwork(
new PathNodeRecovery(
new NodeDataRecovery(_api.SyncPeerPool!, _api.MainNodeStorage!, _api.LogManager),
new SnapRangeRecovery(_api.SyncPeerPool!, _api.LogManager),
_api.LogManager
)
);

_api.TxGossipPolicy.Policies.Add(new SyncedTxGossipPolicy(_api.SyncModeSelector));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public ResultWrapper<byte[]> eth_getStorageAt(Address address, UInt256 positionI
}
catch (MissingTrieNodeException e)
{
var hash = e.TrieNodeException.NodeHash;
var hash = e.Hash;
return ResultWrapper<byte[]>.Fail($"missing trie node {hash} (path ) state {hash} is not available", ErrorCodes.InvalidInput);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Network.Stats/NodeStatsLight.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ private void UpdateValue(ref decimal? currentValue, decimal newValue)
{
return (long?)(transferSpeedType switch
{
TransferSpeedType.Latency => _averageLatency,
TransferSpeedType.Latency => _averageLatency ?? 10000,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const from some timeout? int.MaxValue?

TransferSpeedType.NodeData => _averageNodesTransferSpeed,
TransferSpeedType.Headers => _averageHeadersTransferSpeed,
TransferSpeedType.Bodies => _averageBodiesTransferSpeed,
Expand Down
34 changes: 14 additions & 20 deletions src/Nethermind/Nethermind.State/Healing/HealingStateTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Diagnostics;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
Expand All @@ -14,15 +15,15 @@ namespace Nethermind.State.Healing;

public class HealingStateTree : StateTree
{
private ITrieNodeRecovery<GetTrieNodesRequest>? _recovery;
private IPathRecovery? _recovery;

[DebuggerStepThrough]
public HealingStateTree(ITrieStore? store, ILogManager? logManager)
: base(store.GetTrieStore(null), logManager)
{
}

public void InitializeNetwork(ITrieNodeRecovery<GetTrieNodesRequest> recovery)
public void InitializeNetwork(IPathRecovery recovery)
{
_recovery = recovery;
}
Expand All @@ -35,7 +36,8 @@ public override ReadOnlySpan<byte> Get(ReadOnlySpan<byte> rawKey, Hash256? rootH
}
catch (MissingTrieNodeException e)
{
if (Recover(e.TrieNodeException.NodeHash, e.GetPathPart(), rootHash ?? RootHash))
Hash256 fullPath = new Hash256(rawKey);
if (Recover(e.Path, e.Hash, fullPath))
{
return base.Get(rawKey, rootHash);
}
Expand All @@ -52,7 +54,8 @@ public override void Set(ReadOnlySpan<byte> rawKey, byte[] value)
}
catch (MissingTrieNodeException e)
{
if (Recover(e.TrieNodeException.NodeHash, e.GetPathPart(), RootHash))
Hash256 fullPath = new Hash256(rawKey);
if (Recover(e.Path, e.Hash, fullPath))
{
base.Set(rawKey, value);
}
Expand All @@ -63,27 +66,18 @@ public override void Set(ReadOnlySpan<byte> rawKey, byte[] value)
}
}

private bool Recover(in ValueHash256 rlpHash, ReadOnlySpan<byte> pathPart, Hash256 rootHash)
private bool Recover(in TreePath missingNodePath, in ValueHash256 hash, Hash256 fullPath)
{
if (_recovery?.CanRecover == true)
if (_recovery is not null)
{
GetTrieNodesRequest request = new()
IDictionary<TreePath, byte[]>? rlps = _recovery.Recover(RootHash, null, missingNodePath, hash, fullPath, default).GetAwaiter().GetResult();
if (rlps is not null)
{
RootHash = rootHash,
AccountAndStoragePaths = new ArrayPoolList<PathGroup>(1)
foreach (KeyValuePair<TreePath, byte[]> kv in rlps)
{
new()
{
Group = [Nibbles.EncodePath(pathPart)]
}
ValueHash256 nodeHash = ValueKeccak.Compute(kv.Value);
TrieStore.Set(kv.Key, nodeHash, kv.Value);
}
};

byte[]? rlp = _recovery.Recover(rlpHash, request).GetAwaiter().GetResult();
if (rlp is not null)
{
TreePath path = TreePath.FromNibble(pathPart);
TrieStore.Set(path, rlpHash, rlp);
return true;
}
}
Expand Down
Loading