Skip to content

Commit

Permalink
temp
Browse files Browse the repository at this point in the history
  • Loading branch information
limebell committed Oct 29, 2024
1 parent b7441a0 commit 7bcec29
Show file tree
Hide file tree
Showing 17 changed files with 520 additions and 8 deletions.
17 changes: 15 additions & 2 deletions sdk/node/Libplanet.Node.Extensions/LibplanetServicesExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ public static ILibplanetNodeBuilder AddLibplanetNode(
services.AddSingleton<IValidateOptions<ValidatorOptions>, ValidatorOptionsValidator>();

services.AddOptions<SoloOptions>()
.Bind(configuration.GetSection(SoloOptions.Position));
.Bind(configuration.GetSection(SoloOptions.Position));
services.AddSingleton<IConfigureOptions<SoloOptions>, SoloOptionsConfigurator>();

services.AddOptions<PubsubSwarmOptions>()
.Bind(configuration.GetSection(PubsubSwarmOptions.Position));
services
.AddSingleton<IConfigureOptions<PubsubSwarmOptions>, PubsubSwarmOptionsConfigurator>();

services.AddSingleton<PolicyService>();
services.AddSingleton<StoreService>();
services.AddSingleton(s => (IStoreService)s.GetRequiredService<StoreService>());
Expand All @@ -55,12 +60,20 @@ public static ILibplanetNodeBuilder AddLibplanetNode(
services.AddSingleton<TransactionService>();

var nodeBuilder = new LibplanetNodeBuilder(services);
nodeBuilder.WithTemp();

if (configuration.IsOptionsEnabled(SoloOptions.Position))
{
nodeBuilder.WithSolo();
}

#pragma warning disable
/*
if (configuration.IsOptionsEnabled(PubsubSwarmOptions.Position))
{
Console.WriteLine("Start PubsubSwarm");
nodeBuilder.WithPubsubSwarm();
}
*/
if (configuration.IsOptionsEnabled(SwarmOptions.Position))
{
nodeBuilder.WithSwarm();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using Libplanet.Node.Protocols;
using Libplanet.Node.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Nethermind.Libp2p.Stack;

namespace Libplanet.Node.Extensions.NodeBuilder;

Expand All @@ -17,6 +19,14 @@ internal LibplanetNodeBuilder(IServiceCollection services)

public string[] Scopes => [.. _scopeList];

public ILibplanetNodeBuilder WithTemp()
{
Services.AddLibp2p(builder => builder);
Services.AddSingleton<Temp>();
_scopeList.Add("Temp");
return this;
}

public ILibplanetNodeBuilder WithSolo()
{
Services.AddHostedService<SoloProposeService>();
Expand All @@ -39,4 +49,13 @@ public ILibplanetNodeBuilder WithValidator()
_scopeList.Add("Validator");
return this;
}

public ILibplanetNodeBuilder WithPubsubSwarm()
{
Services.AddSingleton<IMessageChannel, MessageChannel>();
Services.AddLibp2p(builder => builder.AddAppLayerProtocol<PingPongProtocol>());
Services.AddHostedService<PubsubSwarmService>();
_scopeList.Add("PubsubSwarm");
return this;
}
}
3 changes: 2 additions & 1 deletion sdk/node/Libplanet.Node/Libplanet.Node.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
<ProjectReference Include="..\..\..\src\Libplanet.Store\Libplanet.Store.csproj" />
<ProjectReference Include="..\..\..\src\Libplanet\Libplanet.csproj" />
<ProjectReference Include="..\..\..\src\Libplanet.Net\Libplanet.Net.csproj" />
<ProjectReference Include="..\..\..\..\..\suho\suho-lib9c\.Lib9c.Plugin.Shared\Lib9c.Plugin.Shared.csproj" />
<ProjectReference Include="..\..\..\dotnet-libp2p\src\libp2p\Libp2p\Libp2p.csproj" />
<ProjectReference Include="..\..\..\dotnet-libp2p\src\libp2p\Libp2p.Core\Libp2p.Core.csproj" />
</ItemGroup>

</Project>
18 changes: 18 additions & 0 deletions sdk/node/Libplanet.Node/Options/PubsubSwarmOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using System.ComponentModel;
using Libplanet.Node.DataAnnotations;

namespace Libplanet.Node.Options;

[Options(Position)]
public class PubsubSwarmOptions : OptionsBase<PubsubSwarmOptions>
{
public const string Position = "PubsubSwarm";

public bool Proposer { get; set; }

public long BlockInterval { get; set; } = 4000;

[PrivateKey]
[Description("The private key of the node.")]
public string PrivateKey { get; set; } = string.Empty;
}
21 changes: 21 additions & 0 deletions sdk/node/Libplanet.Node/Options/PubsubSwarmOptionsConfigurator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using Libplanet.Common;
using Libplanet.Crypto;
using Microsoft.Extensions.Logging;

namespace Libplanet.Node.Options;

internal sealed class PubsubSwarmOptionsConfigurator(
ILogger<PubsubSwarmOptionsConfigurator> logger)
: OptionsConfiguratorBase<PubsubSwarmOptions>
{
protected override void OnConfigure(PubsubSwarmOptions options)
{
if (options.PrivateKey == string.Empty)
{
options.PrivateKey = ByteUtil.Hex(new PrivateKey().ByteArray);
logger.LogWarning(
"Node's private key is not set. A new private key is generated: {PrivateKey}",
options.PrivateKey);
}
}
}
39 changes: 39 additions & 0 deletions sdk/node/Libplanet.Node/Protocols/MessageContentCodec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using Libplanet.Net.Messages;

namespace Libplanet.Node.Protocols;

public static class MessageContentCodec
{
public static MessageContent Deserialize(byte[] bytes)
{
var rawFrames = bytes;
var type = rawFrames[0];
rawFrames = rawFrames.Skip(1).ToArray();
var dataFrames = new List<byte[]>();
var frameCount = BitConverter.ToInt32(rawFrames[..4]);
rawFrames = rawFrames.Skip(4).ToArray();
for (int i = 0; i < frameCount; i++)
{
var frameSize = BitConverter.ToInt32(rawFrames[..4]);
dataFrames.Add(rawFrames.Skip(4).Take(frameSize).ToArray());
rawFrames = rawFrames.Skip(4 + frameSize).ToArray();
}

return NetMQMessageCodec.CreateMessage(
(MessageContent.MessageType)type,
dataFrames.ToArray());
}

public static byte[] Serialize(MessageContent messageContent)
{
IEnumerable<byte> ba = [(byte)messageContent.Type];
ba = ba.Concat(BitConverter.GetBytes(messageContent.DataFrames.Count()));
foreach (var bytes in messageContent.DataFrames)
{
ba = ba.Concat(BitConverter.GetBytes(bytes.Length));
ba = ba.Concat(bytes);
}

return ba.ToArray();
}
}
65 changes: 65 additions & 0 deletions sdk/node/Libplanet.Node/Protocols/PingPongProtocol.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
using System.Buffers;
using Libplanet.Node.Services;
using Nethermind.Libp2p.Core;

namespace Libplanet.Node.Protocols
{
internal class PingPongProtocol(IMessageChannel messageChannel) : IProtocol
{
public string Id => "/blockchain/ping-pong/1.0.0";

public async Task DialAsync(
IChannel downChannel,
IChannelFactory? upChannelFactory,
IPeerContext context)
{
TimeSpan elapsed = TimeSpan.Zero;

// Keep connection for 5 seconds
while (elapsed < TimeSpan.FromSeconds(5))
{
if (messageChannel.TryGetMessageToSend(
context.RemotePeer.Address,
out byte[]? msg))
{
if (msg is not null)
{
await downChannel.WriteAsync(
new ReadOnlySequence<byte>(msg));
ReadOnlySequence<byte> read =
await downChannel.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow();
messageChannel.ReceiveMessage(
context.RemotePeer.Address,
read.ToArray(),
_ => { });
}

elapsed = TimeSpan.Zero;
}
else
{
await Task.Delay(50);
elapsed += TimeSpan.FromMilliseconds(50);
}
}
}

public async Task ListenAsync(
IChannel downChannel,
IChannelFactory? upChannelFactory,
IPeerContext context)
{
while (true)
{
ReadOnlySequence<byte> read =
await downChannel.ReadAsync(0, ReadBlockingMode.WaitAny).OrThrow();
messageChannel.ReceiveMessage(
context.RemotePeer.Address,
read.ToArray(),
msg => _ =
downChannel.WriteAsync(
new ReadOnlySequence<byte>(msg)));
}
}
}
}
23 changes: 23 additions & 0 deletions sdk/node/Libplanet.Node/Services/IMessageChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Libplanet.Net.Messages;
using Libplanet.Node.Protocols;
using Multiformats.Address;

namespace Libplanet.Node.Services;

public interface IMessageChannel
{
EventHandler<(Multiaddress, byte[], Action<byte[]>)>? OnMessageReceived
{
get;
set;
}

void SendMessage(Multiaddress address, byte[] message);

void ReceiveMessage(
Multiaddress sender,
byte[] message,
Action<byte[]> callback);

bool TryGetMessageToSend(Multiaddress address, out byte[]? message);
}
55 changes: 55 additions & 0 deletions sdk/node/Libplanet.Node/Services/MessageChannel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// SPDX-FileCopyrightText: 2024 Demerzel Solutions Limited
// SPDX-License-Identifier: MIT

using System.Collections.Concurrent;
using Multiformats.Address;

namespace Libplanet.Node.Services;

public class MessageChannel : IMessageChannel
{
private readonly ConcurrentDictionary<Multiaddress, ConcurrentBag<byte[]>>
_messagesToSend = new();

public EventHandler<(Multiaddress, byte[], Action<byte[]>)>? OnMessageReceived
{
get;
set;
}

public void SendMessage(Multiaddress address, byte[] message)
{
if (!_messagesToSend.TryGetValue(address, out ConcurrentBag<byte[]>? bag))
{
bag = [];
_messagesToSend.TryAdd(address, bag);
}

bag.Add(message);
}

public void ReceiveMessage(
Multiaddress sender,
byte[] message,
Action<byte[]> callback)
{
OnMessageReceived?.Invoke(this, (sender, message, callback));
}

public bool TryGetMessageToSend(Multiaddress address, out byte[]? message)
{
if (_messagesToSend.TryGetValue(address, out ConcurrentBag<byte[]>? bag))
{
if (bag.IsEmpty)
{
message = null;
return false;
}

return bag.TryTake(out message);
}

message = null;
return false;
}
}
Loading

0 comments on commit 7bcec29

Please sign in to comment.