Skip to content

Commit

Permalink
feat: use seed in kademlia peer discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
limebell committed Dec 20, 2023
1 parent d6e2887 commit 3edb6a9
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 7 deletions.
13 changes: 13 additions & 0 deletions Libplanet.Net/Protocols/IProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ Task AddPeersAsync(
/// <returns>An awaitable task without value.</returns>
Task RebuildConnectionAsync(int depth, CancellationToken cancellationToken);

/// <summary>
/// Reconstructs network connection between peers on network using seed peers.
/// </summary>
/// <param name="seedPeers">The list of the see peers.</param>
/// <param name="depth">Recursive operation depth to search peers from network.</param>
/// <param name="cancellationToken">A cancellation token used to propagate notification
/// that this operation should be canceled.</param>
/// <returns>An awaitable task without value.</returns>
Task RebuildConnectionAsync(
IEnumerable<BoundPeer> seedPeers,
int depth,
CancellationToken cancellationToken);

/// <summary>
/// Checks the <see cref="KBucket"/> in the <see cref="RoutingTable"/> and if
/// there is an empty <see cref="KBucket"/>, fill it with <see cref="BoundPeer"/>s
Expand Down
34 changes: 34 additions & 0 deletions Libplanet.Net/Protocols/KademliaProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,34 @@ public async Task RebuildConnectionAsync(int depth, CancellationToken cancellati
}
}

/// <inheritdoc />
public async Task RebuildConnectionAsync(
IEnumerable<BoundPeer> seedPeers,
int depth,
CancellationToken cancellationToken)
{
_logger.Verbose("Rebuilding connection using seed peers...");
var history = new ConcurrentBag<BoundPeer>();
var dialHistory = new ConcurrentBag<BoundPeer>();
var tasks = seedPeers.Select(seed =>
FindPeerAsync(
history,
dialHistory,
_address,
seed,
depth,
_requestTimeout,
cancellationToken)).ToList();

try
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
catch (TimeoutException)
{
}
}

/// <inheritdoc />
public async Task CheckReplacementCacheAsync(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -722,6 +750,12 @@ private async Task ProcessFoundAsync(
"Some responses from neighbors found unexpectedly terminated");
}

if (depth == 1)
{
// depth 1 means spawn FindPeerAsync task of depth 0, and it does nothing.
return;
}

var findPeerTasks = new List<Task>();
BoundPeer closestKnownPeer = closestCandidate.FirstOrDefault();
var count = 0;
Expand Down
27 changes: 24 additions & 3 deletions Libplanet.Net/Protocols/RoutingTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ public class RoutingTable : IRoutingTable
/// <param name="address"><see cref="Address"/> of this peer.</param>
/// <param name="tableSize">The number of buckets in the table.</param>
/// <param name="bucketSize">The size of a single bucket.</param>
/// <param name="seedPeers">The list of the seed peers.
/// If null is given, <see cref="SeedPeers"/> is set to an empty list.</param>
/// <exception cref="ArgumentOutOfRangeException">
/// Thrown when <paramref name="tableSize"/> or <paramref name="bucketSize"/> is
/// less then or equal to 0.</exception>
public RoutingTable(
Address address,
int tableSize = Kademlia.TableSize,
int bucketSize = Kademlia.BucketSize)
int bucketSize = Kademlia.BucketSize,
IEnumerable<BoundPeer>? seedPeers = null)
{
if (tableSize <= 0)
{
Expand All @@ -45,6 +48,7 @@ public RoutingTable(
_address = address;
TableSize = tableSize;
BucketSize = bucketSize;
SeedPeers = seedPeers?.ToList() ?? new List<BoundPeer>();
_logger = Log
.ForContext<RoutingTable>()
.ForContext("Source", nameof(RoutingTable));
Expand All @@ -67,6 +71,13 @@ public RoutingTable(
/// </summary>
public int BucketSize { get; }

/// <summary>
/// The list of the seed peers.
/// <remarks>Seed peers are excluded from bound peer selection, and neighbor.</remarks>
/// <seealso cref="PeersToBroadcast"/>
/// </summary>
public IReadOnlyList<BoundPeer> SeedPeers { get; }

/// <inheritdoc />
public int Count => _buckets.Sum(bucket => bucket.Count);

Expand Down Expand Up @@ -144,7 +155,7 @@ public bool Contains(BoundPeer peer)
Peers.FirstOrDefault(peer => peer.Address.Equals(addr));

/// <summary>
/// Removes all peers in the table. This method does not affect static peers.
/// Removes all peers in the table.
/// </summary>
public void Clear()
{
Expand Down Expand Up @@ -179,7 +190,6 @@ public IReadOnlyList<BoundPeer> Neighbors(BoundPeer target, int k, bool includeT
/// <returns>An enumerable of <see cref="BoundPeer"/>.</returns>
public IReadOnlyList<BoundPeer> Neighbors(Address target, int k, bool includeTarget)
{
// TODO: Should include static peers?
var sorted = _buckets
.Where(b => !b.IsEmpty)
.SelectMany(b => b.Peers)
Expand All @@ -200,12 +210,17 @@ public IReadOnlyList<BoundPeer> Neighbors(Address target, int k, bool includeTar

/// <summary>
/// Marks <paramref name="peer"/> checked and refreshes last checked time of the peer.
/// If the given <paramref name="peer"/> is one of the <see cref="SeedPeers"/>,
/// it is not added.
/// </summary>
/// <param name="peer">The <see cref="BoundPeer"/> to check.</param>
/// <param name="start"><see cref="DateTimeOffset"/> at the beginning of the check.</param>
/// <param name="end"><see cref="DateTimeOffset"/> at the end of the check.</param>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="peer"/> is <see langword="null"/>.</exception>
/// <exception cref="ArgumentException">
/// Thrown when given <paramref name="peer"/>'s <see cref="Address"/> is equal to
/// <see cref="RoutingTable"/>'s <see cref="Address"/>.</exception>
public void Check(BoundPeer peer, DateTimeOffset start, DateTimeOffset end)
=> BucketOf(peer).Check(peer, start, end);

Expand All @@ -218,6 +233,12 @@ internal void AddPeer(BoundPeer peer, DateTimeOffset updated)
nameof(peer));
}

if (SeedPeers.Any(seed => peer.Address.Equals(seed.Address)))
{
_logger.Verbose("A seed peer is disallowed to add in the routing table.");
return;
}

_logger.Debug("Adding peer {Peer} to the routing table...", peer);
BucketOf(peer).AddPeer(peer, updated);
}
Expand Down
22 changes: 18 additions & 4 deletions Libplanet.Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ public Swarm(

Options = options ?? new SwarmOptions();
TxCompletion = new TxCompletion<BoundPeer>(BlockChain, GetTxsAsync, BroadcastTxs);
RoutingTable = new RoutingTable(Address, Options.TableSize, Options.BucketSize);
RoutingTable = new RoutingTable(
Address,
Options.TableSize,
Options.BucketSize,
Options.StaticPeers);

// FIXME: after the initialization of NetMQTransport is fully converted to asynchronous
// code, the portion initializing the swarm in Agent.cs in NineChronicles should be
Expand Down Expand Up @@ -1432,9 +1436,19 @@ private async Task RebuildConnectionAsync(
try
{
await Task.Delay(period, cancellationToken);
await PeerDiscovery.RebuildConnectionAsync(
Kademlia.MaxDepth,
cancellationToken);
if (RoutingTable.SeedPeers.Any())
{
await PeerDiscovery.RebuildConnectionAsync(
RoutingTable.SeedPeers,
1,
cancellationToken);
}
else
{
await PeerDiscovery.RebuildConnectionAsync(
Kademlia.MaxDepth,
cancellationToken);
}
}
catch (OperationCanceledException e)
{
Expand Down

0 comments on commit 3edb6a9

Please sign in to comment.