diff --git a/.github/workflows/push_docker_image.yml b/.github/workflows/push_docker_image.yml index 305995207..90f4bfd83 100644 --- a/.github/workflows/push_docker_image.yml +++ b/.github/workflows/push_docker_image.yml @@ -11,6 +11,8 @@ on: - qa-* - 2022q3-worldboss - release/* + # This branch is for testing only. Use until the next(v200080) release. + - test/action-evaluation-publisher-elapse-metric tags: - "*" workflow_dispatch: diff --git a/Lib9c b/Lib9c index fe86a78e2..7a62fd6d8 160000 --- a/Lib9c +++ b/Lib9c @@ -1 +1 @@ -Subproject commit fe86a78e29d7d15486779b133bea21ba0ec63f30 +Subproject commit 7a62fd6d8a0628d554d9a89b4691b80f4243181b diff --git a/NineChronicles.Headless.Executable.Tests/Commands/AccountCommandTest.cs b/NineChronicles.Headless.Executable.Tests/Commands/AccountCommandTest.cs index 7bb3fa3e0..08d7a2759 100644 --- a/NineChronicles.Headless.Executable.Tests/Commands/AccountCommandTest.cs +++ b/NineChronicles.Headless.Executable.Tests/Commands/AccountCommandTest.cs @@ -46,7 +46,7 @@ public void Balance(StoreType storeType) var stateKeyValueStore = new RocksDBKeyValueStore(statesPath); var stateStore = new TrieStateStore(stateKeyValueStore); IStagePolicy stagePolicy = new VolatileStagePolicy(); - IBlockPolicy blockPolicy = new BlockPolicySource(Logger.None).GetPolicy(); + IBlockPolicy blockPolicy = new BlockPolicySource().GetPolicy(); ActionEvaluator actionEvaluator = new ActionEvaluator( _ => blockPolicy.BlockAction, new BlockChainStates(store, stateStore), diff --git a/NineChronicles.Headless.Executable.Tests/Commands/ChainCommandTest.cs b/NineChronicles.Headless.Executable.Tests/Commands/ChainCommandTest.cs index 17f04ca9a..dc21068eb 100644 --- a/NineChronicles.Headless.Executable.Tests/Commands/ChainCommandTest.cs +++ b/NineChronicles.Headless.Executable.Tests/Commands/ChainCommandTest.cs @@ -90,7 +90,7 @@ public void Inspect(StoreType storeType) IStore store = storeType.CreateStore(_storePath); IStateStore stateStore = new TrieStateStore(new RocksDBKeyValueStore(Path.Combine(_storePath, "states"))); IStagePolicy stagePolicy = new VolatileStagePolicy(); - IBlockPolicy blockPolicy = new BlockPolicySource(Logger.None).GetTestPolicy(); + IBlockPolicy blockPolicy = new BlockPolicySource().GetTestPolicy(); ActionEvaluator actionEvaluator = new ActionEvaluator( _ => blockPolicy.BlockAction, new BlockChainStates(store, stateStore), @@ -151,7 +151,7 @@ public void Truncate(StoreType storeType) IStore store = storeType.CreateStore(_storePath); IStateStore stateStore = new TrieStateStore(new RocksDBKeyValueStore(Path.Combine(_storePath, "states"))); IStagePolicy stagePolicy = new VolatileStagePolicy(); - IBlockPolicy blockPolicy = new BlockPolicySource(Logger.None).GetTestPolicy(); + IBlockPolicy blockPolicy = new BlockPolicySource().GetTestPolicy(); ActionEvaluator actionEvaluator = new ActionEvaluator( _ => blockPolicy.BlockAction, new BlockChainStates(store, stateStore), @@ -230,7 +230,7 @@ public void PruneState(StoreType storeType) var stateKeyValueStore = new RocksDBKeyValueStore(statesPath); var stateStore = new TrieStateStore(stateKeyValueStore); IStagePolicy stagePolicy = new VolatileStagePolicy(); - IBlockPolicy blockPolicy = new BlockPolicySource(Logger.None).GetPolicy(); + IBlockPolicy blockPolicy = new BlockPolicySource().GetPolicy(); ActionEvaluator actionEvaluator = new ActionEvaluator( _ => blockPolicy.BlockAction, new BlockChainStates(store, stateStore), @@ -272,7 +272,7 @@ public void Snapshot(StoreType storeType) var stateKeyValueStore = new RocksDBKeyValueStore(statesPath); var stateStore = new TrieStateStore(stateKeyValueStore); IStagePolicy stagePolicy = new VolatileStagePolicy(); - IBlockPolicy blockPolicy = new BlockPolicySource(Logger.None).GetPolicy(); + IBlockPolicy blockPolicy = new BlockPolicySource().GetPolicy(); ActionEvaluator actionEvaluator = new ActionEvaluator( _ => blockPolicy.BlockAction, new BlockChainStates(store, stateStore), diff --git a/NineChronicles.Headless.Executable/Commands/ChainCommand.cs b/NineChronicles.Headless.Executable/Commands/ChainCommand.cs index 817fc859e..22ebe8425 100644 --- a/NineChronicles.Headless.Executable/Commands/ChainCommand.cs +++ b/NineChronicles.Headless.Executable/Commands/ChainCommand.cs @@ -114,7 +114,7 @@ public void Inspect( } IStagePolicy stagePolicy = new VolatileStagePolicy(); - IBlockPolicy blockPolicy = new BlockPolicySource(Logger.None).GetPolicy(); + IBlockPolicy blockPolicy = new BlockPolicySource().GetPolicy(); IStore store = storeType.CreateStore(storePath); var stateStore = new TrieStateStore(new DefaultKeyValueStore(null)); if (!(store.GetCanonicalChainId() is { } chainId)) diff --git a/NineChronicles.Headless.Executable/Commands/ReplayCommand.cs b/NineChronicles.Headless.Executable/Commands/ReplayCommand.cs index 19ebea278..9fb831a5a 100644 --- a/NineChronicles.Headless.Executable/Commands/ReplayCommand.cs +++ b/NineChronicles.Headless.Executable/Commands/ReplayCommand.cs @@ -475,7 +475,7 @@ private static (FileStream? fs, StreamWriter? sw) GetOutputFileStream( var genesisBlock = store.GetBlock(genesisBlockHash); // Make BlockChain and blocks. - var policy = new BlockPolicySource(Logger.None).GetPolicy(); + var policy = new BlockPolicySource().GetPolicy(); var stagePolicy = new VolatileStagePolicy(); var stateKeyValueStore = new RocksDBKeyValueStore(Path.Combine(storePath, "states")); var stateStore = new TrieStateStore(stateKeyValueStore); @@ -525,7 +525,7 @@ private Transaction LoadTx(string txPath) private ActionEvaluator GetActionEvaluator(BlockChain blockChain) { - var policy = new BlockPolicySource(Logger.None).GetPolicy(); + var policy = new BlockPolicySource().GetPolicy(); IActionLoader actionLoader = new NCActionLoader(); return new ActionEvaluator( _ => policy.BlockAction, diff --git a/NineChronicles.Headless.Executable/Program.cs b/NineChronicles.Headless.Executable/Program.cs index 5b3d9696b..c7cab8112 100644 --- a/NineChronicles.Headless.Executable/Program.cs +++ b/NineChronicles.Headless.Executable/Program.cs @@ -353,35 +353,6 @@ public async Task Run( KeyStore = Web3KeyStore.DefaultKeyStore, }; - if (headlessConfig.GraphQLServer) - { - string? secretToken = null; - if (headlessConfig.GraphQLSecretTokenPath is { }) - { - var buffer = new byte[40]; - new SecureRandom().NextBytes(buffer); - secretToken = Convert.ToBase64String(buffer); - await File.WriteAllTextAsync(headlessConfig.GraphQLSecretTokenPath, secretToken); - } - - var graphQLNodeServiceProperties = new GraphQLNodeServiceProperties - { - GraphQLServer = headlessConfig.GraphQLServer, - GraphQLListenHost = headlessConfig.GraphQLHost, - GraphQLListenPort = headlessConfig.GraphQLPort, - SecretToken = secretToken, - NoCors = headlessConfig.NoCors, - UseMagicOnion = headlessConfig.RpcServer, - HttpOptions = headlessConfig.RpcServer && headlessConfig.RpcHttpServer == true - ? new GraphQLNodeServiceProperties.MagicOnionHttpOptions( - $"{headlessConfig.RpcListenHost}:{headlessConfig.RpcListenPort}") - : (GraphQLNodeServiceProperties.MagicOnionHttpOptions?)null, - }; - - var graphQLService = new GraphQLService(graphQLNodeServiceProperties, standaloneContext, configuration); - hostBuilder = graphQLService.Configure(hostBuilder); - } - var properties = NineChroniclesNodeServiceProperties .GenerateLibplanetNodeServiceProperties( headlessConfig.AppProtocolVersionString, @@ -488,7 +459,10 @@ IActionLoader MakeSingleActionLoader() .AddAspNetCoreInstrumentation() .AddPrometheusExporter()); }); - hostBuilder.UseNineChroniclesNode(nineChroniclesProperties, standaloneContext); + + NineChroniclesNodeService service = + NineChroniclesNodeService.Create(nineChroniclesProperties, standaloneContext); + ActionEvaluationPublisher publisher; if (headlessConfig.RpcServer) { var context = new RpcContext @@ -501,7 +475,7 @@ IActionLoader MakeSingleActionLoader() headlessConfig.RpcListenPort, headlessConfig.RpcRemoteServer == true ); - var publisher = new ActionEvaluationPublisher( + publisher = new ActionEvaluationPublisher( standaloneContext.NineChroniclesNodeService!.BlockRenderer, standaloneContext.NineChroniclesNodeService!.ActionRenderer, standaloneContext.NineChroniclesNodeService!.ExceptionRenderer, @@ -512,6 +486,11 @@ IActionLoader MakeSingleActionLoader() new ConcurrentDictionary() ); + hostBuilder.UseNineChroniclesNode( + nineChroniclesProperties, + standaloneContext, + publisher, + service); hostBuilder.UseNineChroniclesRPC( rpcProperties, publisher, @@ -519,6 +498,58 @@ IActionLoader MakeSingleActionLoader() configuration ); } + else + { + var context = new RpcContext + { + RpcRemoteSever = false + }; + publisher = new ActionEvaluationPublisher( + standaloneContext.NineChroniclesNodeService!.BlockRenderer, + standaloneContext.NineChroniclesNodeService!.ActionRenderer, + standaloneContext.NineChroniclesNodeService!.ExceptionRenderer, + standaloneContext.NineChroniclesNodeService!.NodeStatusRenderer, + IPAddress.Loopback.ToString(), + 0, + context, + new ConcurrentDictionary() + ); + hostBuilder.UseNineChroniclesNode( + nineChroniclesProperties, + standaloneContext, + publisher, + service); + } + + if (headlessConfig.GraphQLServer) + { + string? secretToken = null; + if (headlessConfig.GraphQLSecretTokenPath is { }) + { + var buffer = new byte[40]; + new SecureRandom().NextBytes(buffer); + secretToken = Convert.ToBase64String(buffer); + await File.WriteAllTextAsync(headlessConfig.GraphQLSecretTokenPath, secretToken); + } + + var graphQLNodeServiceProperties = new GraphQLNodeServiceProperties + { + GraphQLServer = headlessConfig.GraphQLServer, + GraphQLListenHost = headlessConfig.GraphQLHost, + GraphQLListenPort = headlessConfig.GraphQLPort, + SecretToken = secretToken, + NoCors = headlessConfig.NoCors, + UseMagicOnion = headlessConfig.RpcServer, + HttpOptions = headlessConfig.RpcServer && headlessConfig.RpcHttpServer == true + ? new GraphQLNodeServiceProperties.MagicOnionHttpOptions( + $"{headlessConfig.RpcListenHost}:{headlessConfig.RpcListenPort}") + : (GraphQLNodeServiceProperties.MagicOnionHttpOptions?)null, + }; + + var graphQLService = + new GraphQLService(graphQLNodeServiceProperties, standaloneContext, configuration, publisher); + hostBuilder = graphQLService.Configure(hostBuilder); + } await hostBuilder.RunConsoleAsync(cancellationToken ?? Context.CancellationToken); } diff --git a/NineChronicles.Headless.Executable/appsettings.json b/NineChronicles.Headless.Executable/appsettings.json index 9df03ea16..87b56cfff 100644 --- a/NineChronicles.Headless.Executable/appsettings.json +++ b/NineChronicles.Headless.Executable/appsettings.json @@ -115,7 +115,7 @@ "ContentType": "application/json", "StatusCode": 429 }, - "IpBanThresholdCount": 10, + "IpBanThresholdCount": 5, "IpBanMinute" : 60, "IpBanResponse": { "Content": "{ \"message\": \"Your Ip has been banned.\" }", @@ -127,6 +127,6 @@ "EnableManaging": false, "ManagementTimeMinutes": 10, "TxIntervalMinutes": 10, - "ThresholdCount": 50 + "ThresholdCount": 29 } } diff --git a/NineChronicles.Headless.Tests/GraphQLStartupTest.cs b/NineChronicles.Headless.Tests/GraphQLStartupTest.cs index bc7a98e76..2568550be 100644 --- a/NineChronicles.Headless.Tests/GraphQLStartupTest.cs +++ b/NineChronicles.Headless.Tests/GraphQLStartupTest.cs @@ -1,3 +1,5 @@ +using System.Collections.Concurrent; +using Lib9c.Renderers; using Microsoft.AspNetCore.Cors.Infrastructure; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -16,7 +18,17 @@ public GraphQLStartupTest() { _configuration = new ConfigurationBuilder().AddInMemoryCollection().Build(); var standaloneContext = CreateStandaloneContext(); - _startup = new GraphQLService.GraphQLStartup(_configuration, standaloneContext); + var publisher = new ActionEvaluationPublisher( + new BlockRenderer(), + new ActionRenderer(), + new ExceptionRenderer(), + new NodeStatusRenderer(), + "", + 0, + new RpcContext(), + new ConcurrentDictionary() + ); + _startup = new GraphQLService.GraphQLStartup(_configuration, standaloneContext, publisher); } [Theory] diff --git a/NineChronicles.Headless.Tests/GraphTypes/StandaloneSubscriptionTest.cs b/NineChronicles.Headless.Tests/GraphTypes/StandaloneSubscriptionTest.cs index 76f309a1f..27b8cec53 100644 --- a/NineChronicles.Headless.Tests/GraphTypes/StandaloneSubscriptionTest.cs +++ b/NineChronicles.Headless.Tests/GraphTypes/StandaloneSubscriptionTest.cs @@ -53,14 +53,16 @@ public async Task SubscribeTipChangedEvent() lastCommit: GenerateBlockCommit(BlockChain.Tip.Index, BlockChain.Tip.Hash, GenesisValidators)); BlockChain.Append(block, GenerateBlockCommit(block.Index, block.Hash, GenesisValidators)); - var result = await ExecuteSubscriptionQueryAsync("subscription { tipChanged { index hash } }"); - // var data = (Dictionary)((ExecutionNode) result.Data!).ToValue()!; + + Assert.Equal(index, BlockChain.Tip.Index); + await Task.Delay(TimeSpan.FromSeconds(1)); + + var result = await ExecuteSubscriptionQueryAsync("subscription { tipChanged { index hash } }"); Assert.IsType(result); var subscribeResult = (SubscriptionExecutionResult)result; - Assert.Equal(index, BlockChain.Tip.Index); var stream = subscribeResult.Streams!.Values.FirstOrDefault(); - var rawEvents = await stream.Take((int)index); + var rawEvents = await stream.Take(1); Assert.NotNull(rawEvents); var events = (Dictionary)((ExecutionNode)rawEvents.Data!).ToValue()!; diff --git a/NineChronicles.Headless/ActionEvaluationPublisher.cs b/NineChronicles.Headless/ActionEvaluationPublisher.cs index 9d07b8d95..21a8bd643 100644 --- a/NineChronicles.Headless/ActionEvaluationPublisher.cs +++ b/NineChronicles.Headless/ActionEvaluationPublisher.cs @@ -24,7 +24,9 @@ using Libplanet.Types.Tx; using MagicOnion.Client; using MessagePack; +using Microsoft.Extensions.Caching.Memory; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Options; using Nekoyume.Action; using Nekoyume.Model.State; using Nekoyume.Shared.Hubs; @@ -43,8 +45,11 @@ public class ActionEvaluationPublisher : BackgroundService private readonly ExceptionRenderer _exceptionRenderer; private readonly NodeStatusRenderer _nodeStatusRenderer; - private readonly ConcurrentDictionary _clients = new ConcurrentDictionary(); - private readonly ConcurrentDictionary _clientsByDevice = new ConcurrentDictionary(); + private readonly ConcurrentDictionary _clients = new(); + private readonly ConcurrentDictionary _clientsByDevice = new(); + private readonly ConcurrentDictionary> _clientsByIp = new(); + private readonly ConcurrentDictionary, List> _clientsIpsList = new(); + private readonly IMemoryCache _cache; private RpcContext _context; private ConcurrentDictionary _sentryTraces; @@ -67,6 +72,9 @@ public ActionEvaluationPublisher( _port = port; _context = context; _sentryTraces = sentryTraces; + var memoryCacheOptions = new MemoryCacheOptions(); + var options = Options.Create(memoryCacheOptions); + _cache = new MemoryCache(options); var meter = new Meter("NineChronicles"); meter.CreateObservableGauge( @@ -82,6 +90,18 @@ public ActionEvaluationPublisher( new Measurement(this.GetClientsCountByDevice("other"), new[] { new KeyValuePair("device", "other") }), }, description: "Number of RPC clients connected by device."); + meter.CreateObservableGauge( + "ninechronicles_clients_count_by_ips", + () => new[] + { + new Measurement( + GetClientsCountByIp(10), + new KeyValuePair("account-type", "multi")), + new Measurement( + GetClientsCountByIp(0), + new KeyValuePair("account-type", "all")), + }, + description: "Number of RPC clients connected grouped by ips."); ActionEvaluationHub.OnClientDisconnected += RemoveClient; } @@ -151,6 +171,41 @@ public List
GetClientsByDevice(string device) .ToList(); } + public void AddClientAndIp(string ipAddress, string clientAddress) + { + if (!_clientsByIp.ContainsKey(ipAddress)) + { + _clientsByIp[ipAddress] = new HashSet(); + } + + _clientsByIp[ipAddress].Add(clientAddress); + } + + public int GetClientsCountByIp(int minimum) + { + var finder = new IdGroupFinder(_cache); + var groups = finder.FindGroups(_clientsByIp); + return groups.Where(group => group.IDs.Count >= minimum) + .Sum(group => group.IDs.Count); + } + + public ConcurrentDictionary, List> GetClientsByIp(int minimum) + { + var finder = new IdGroupFinder(_cache); + var groups = finder.FindGroups(_clientsByIp); + ConcurrentDictionary, List> clientsIpList = new(); + foreach (var group in groups) + { + if (group.IDs.Count >= minimum) + { + clientsIpList.TryAdd(group.IPs.ToList(), group.IDs.ToList()); + } + } + + return new ConcurrentDictionary, List>( + clientsIpList.OrderByDescending(x => x.Value.Count)); + } + public override async Task StopAsync(CancellationToken cancellationToken) { foreach (Client? client in _clients.Values) @@ -211,9 +266,104 @@ private async void RemoveClient(string clientAddressHex) RemoveClientByDevice(clientAddress); await RemoveClient(clientAddress); } - catch (Exception) + catch (Exception e) + { + Log.Error(e, "[{ClientAddress}] Error while removing client.", clientAddressHex); + } + } + + private sealed class IdGroupFinder + { + private Dictionary> adjacencyList = new(); + private HashSet visited = new(); + private readonly IMemoryCache _memoryCache; + + public IdGroupFinder(IMemoryCache memoryCache) + { + _memoryCache = memoryCache; + } + + public List<(HashSet IPs, HashSet IDs)> FindGroups(ConcurrentDictionary> dict) + { + // Create a serialized version of the input for caching purposes + var serializedInput = "key"; + + // Check cache + if (_memoryCache.TryGetValue(serializedInput, out List<(HashSet IPs, HashSet IDs)> cachedResult)) + { + return cachedResult; + } + + // Step 1: Construct the adjacency list + foreach (var kvp in dict) + { + var ip = kvp.Key; + if (!adjacencyList.ContainsKey(ip)) + { + adjacencyList[ip] = new List(); + } + + foreach (var id in kvp.Value) + { + adjacencyList[ip].Add(id); + + if (!adjacencyList.ContainsKey(id)) + { + adjacencyList[id] = new List(); + } + adjacencyList[id].Add(ip); + } + } + + // Step 2: DFS to find connected components + var groups = new List<(HashSet IPs, HashSet IDs)>(); + foreach (var node in adjacencyList.Keys) + { + if (!visited.Contains(node)) + { + var ips = new HashSet(); + var ids = new HashSet(); + DFS(node, ips, ids, dict); + groups.Add((ips, ids)); + } + } + + // Cache the result before returning. Here we set a sliding expiration of 1 hour. + var cacheEntryOptions = new MemoryCacheEntryOptions + { + AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(10) + }; + _memoryCache.Set(serializedInput, groups, cacheEntryOptions); + + return groups; + } + + private void DFS(string node, HashSet ips, HashSet ids, ConcurrentDictionary> dict) { - Log.Error("[{ClientAddress }] Client "); + if (visited.Contains(node)) + { + return; + } + + visited.Add(node); + + // if node is an IP + if (dict.ContainsKey(node)) + { + ips.Add(node); + } + else + { + ids.Add(node); + } + + foreach (var neighbor in adjacencyList[node]) + { + if (!visited.Contains(neighbor)) + { + DFS(neighbor, ips, ids, dict); + } + } } } diff --git a/NineChronicles.Headless/BlockChainService.cs b/NineChronicles.Headless/BlockChainService.cs index 6f6ac1117..c28425588 100644 --- a/NineChronicles.Headless/BlockChainService.cs +++ b/NineChronicles.Headless/BlockChainService.cs @@ -118,7 +118,7 @@ public UnaryResult GetState(byte[] addressBytes, byte[] blockHashBytes) var hash = new BlockHash(blockHashBytes); IValue state = _blockChain.GetStates(new[] { address }, hash)[0]; // FIXME: Null과 null 구분해서 반환해야 할 듯 - byte[] encoded = _codec.Encode(state ?? new Null()); + byte[] encoded = _codec.Encode(state ?? Null.Value); return new UnaryResult(encoded); } @@ -148,7 +148,7 @@ public UnaryResult> GetStateBulk(IEnumerable IReadOnlyList values = _blockChain.GetStates(addresses, hash); for (int i = 0; i < addresses.Length; i++) { - result.TryAdd(addresses[i].ToByteArray(), _codec.Encode(values[i] ?? new Null())); + result.TryAdd(addresses[i].ToByteArray(), _codec.Encode(values[i] ?? Null.Value)); } return new UnaryResult>(result); @@ -180,6 +180,18 @@ public UnaryResult GetTip() return new UnaryResult(headerBytes); } + public UnaryResult GetBlockHash(long blockIndex) + { + try + { + return new UnaryResult(_codec.Encode(_blockChain[blockIndex].Hash.Bencoded)); + } + catch (ArgumentOutOfRangeException) + { + return new UnaryResult(_codec.Encode(Null.Value)); + } + } + public UnaryResult GetNextTxNonce(byte[] addressBytes) { var address = new Address(addressBytes); diff --git a/NineChronicles.Headless/GraphQLService.cs b/NineChronicles.Headless/GraphQLService.cs index df6a26ff3..80be90055 100644 --- a/NineChronicles.Headless/GraphQLService.cs +++ b/NineChronicles.Headless/GraphQLService.cs @@ -38,12 +38,18 @@ public class GraphQLService private StandaloneContext StandaloneContext { get; } private GraphQLNodeServiceProperties GraphQlNodeServiceProperties { get; } private IConfiguration Configuration { get; } + private ActionEvaluationPublisher Publisher { get; } - public GraphQLService(GraphQLNodeServiceProperties properties, StandaloneContext standaloneContext, IConfiguration configuration) + public GraphQLService( + GraphQLNodeServiceProperties properties, + StandaloneContext standaloneContext, + IConfiguration configuration, + ActionEvaluationPublisher publisher) { GraphQlNodeServiceProperties = properties; StandaloneContext = standaloneContext; Configuration = configuration; + Publisher = publisher; } public IHostBuilder Configure(IHostBuilder hostBuilder) @@ -53,7 +59,7 @@ public IHostBuilder Configure(IHostBuilder hostBuilder) return hostBuilder.ConfigureWebHostDefaults(builder => { - builder.UseStartup(x => new GraphQLStartup(x.Configuration, StandaloneContext)); + builder.UseStartup(x => new GraphQLStartup(x.Configuration, StandaloneContext, Publisher)); builder.ConfigureAppConfiguration( (context, builder) => { @@ -92,14 +98,19 @@ public IHostBuilder Configure(IHostBuilder hostBuilder) internal class GraphQLStartup { - public GraphQLStartup(IConfiguration configuration, StandaloneContext standaloneContext) + public GraphQLStartup( + IConfiguration configuration, + StandaloneContext standaloneContext, + ActionEvaluationPublisher publisher) { Configuration = configuration; StandaloneContext = standaloneContext; + Publisher = publisher; } public IConfiguration Configuration { get; } public StandaloneContext StandaloneContext; + public ActionEvaluationPublisher Publisher; public void ConfigureServices(IServiceCollection services) { @@ -172,7 +183,8 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env) Dictionary> ipSignerList = new(); app.UseMiddleware( StandaloneContext, - ipSignerList); + ipSignerList, + Publisher); } app.UseMiddleware(); diff --git a/NineChronicles.Headless/GraphTypes/MultiAccountType.cs b/NineChronicles.Headless/GraphTypes/MultiAccountType.cs new file mode 100644 index 000000000..b82f3acec --- /dev/null +++ b/NineChronicles.Headless/GraphTypes/MultiAccountType.cs @@ -0,0 +1,26 @@ +using System.Collections.Generic; +using GraphQL.Types; + +namespace NineChronicles.Headless.GraphTypes +{ + public class MultiAccountInfoGraphType : ObjectGraphType + { + public class MultiAccountInfo + { + public string Key { get; set; } = null!; + public List Ips { get; set; } = null!; + public List Agents { get; set; } = null!; + public int IpsCount { get; set; } + public int AgentsCount { get; set; } + } + + public MultiAccountInfoGraphType() + { + Field("key", x => x.Key); + Field>("ips", resolve: context => context.Source.Ips); + Field>("agents", resolve: context => context.Source.Agents); + Field("ipsCount", resolve: context => context.Source.Ips.Count); + Field("agentsCount", resolve: context => context.Source.Agents.Count); + } + } +} diff --git a/NineChronicles.Headless/GraphTypes/RpcInformationQuery.cs b/NineChronicles.Headless/GraphTypes/RpcInformationQuery.cs index ea505a377..d392712b5 100644 --- a/NineChronicles.Headless/GraphTypes/RpcInformationQuery.cs +++ b/NineChronicles.Headless/GraphTypes/RpcInformationQuery.cs @@ -48,6 +48,48 @@ public RpcInformationQuery(ActionEvaluationPublisher publisher) string device = context.GetArgument("device"); return publisher.GetClientsByDevice(device); }); + Field>( + name: "clientsCountByIps", + arguments: new QueryArguments( + new QueryArgument> + { + Name = "minimum" + } + ), + description: "clients count connected to this node grouped by Ip addresses.", + resolve: context => + { + int minimum = context.GetArgument("minimum"); + return publisher.GetClientsCountByIp(minimum); + }); + Field>>( + name: "clientsByIps", + arguments: new QueryArguments( + new QueryArgument> + { + Name = "minimum" + } + ), + description: "clients connected to this node grouped by Ip addresses.", + resolve: context => + { + int minimum = context.GetArgument("minimum"); + var a = new List(); + var data = publisher.GetClientsByIp(minimum); + foreach (var i in data) + { + var b = new MultiAccountInfoGraphType.MultiAccountInfo + { + Key = i.Key.First(), + Ips = i.Key, + Agents = i.Value, + IpsCount = i.Key.Count, + AgentsCount = i.Value.Count, + }; + a.Add(b); + } + return a.OrderByDescending(x => x.AgentsCount); + }); } } } diff --git a/NineChronicles.Headless/GraphTypes/StandaloneSubscription.cs b/NineChronicles.Headless/GraphTypes/StandaloneSubscription.cs index 4fdf34bac..7ff4b2054 100644 --- a/NineChronicles.Headless/GraphTypes/StandaloneSubscription.cs +++ b/NineChronicles.Headless/GraphTypes/StandaloneSubscription.cs @@ -122,7 +122,7 @@ public PreloadStateType() private BlockHeader? _tipHeader; - private ISubject _subject = new ReplaySubject(); + private ISubject _subject = new ReplaySubject(1); private ISubject _transactionSubject = new Subject(); diff --git a/NineChronicles.Headless/HostBuilderExtensions.cs b/NineChronicles.Headless/HostBuilderExtensions.cs index ae5d84326..65afd87f9 100644 --- a/NineChronicles.Headless/HostBuilderExtensions.cs +++ b/NineChronicles.Headless/HostBuilderExtensions.cs @@ -26,15 +26,11 @@ public static class HostBuilderExtensions public static IHostBuilder UseNineChroniclesNode( this IHostBuilder builder, NineChroniclesNodeServiceProperties properties, - StandaloneContext context + StandaloneContext context, + ActionEvaluationPublisher actionEvaluationPublisher, + NineChroniclesNodeService service ) { - NineChroniclesNodeService service = - NineChroniclesNodeService.Create(properties, context); - var rpcContext = new RpcContext - { - RpcRemoteSever = false - }; return builder.ConfigureServices(services => { services.AddHostedService(provider => service); @@ -53,19 +49,8 @@ StandaloneContext context { services.AddSingleton(provider => libplanetNodeServiceProperties); } - services.AddSingleton(provider => - { - return new ActionEvaluationPublisher( - context.NineChroniclesNodeService!.BlockRenderer, - context.NineChroniclesNodeService!.ActionRenderer, - context.NineChroniclesNodeService!.ExceptionRenderer, - context.NineChroniclesNodeService!.NodeStatusRenderer, - IPAddress.Loopback.ToString(), - 0, - rpcContext, - provider.GetRequiredService>() - ); - }); + + services.AddSingleton(_ => actionEvaluationPublisher); }); } diff --git a/NineChronicles.Headless/Middleware/GrpcMultiAccountManagementMiddleware.cs b/NineChronicles.Headless/Middleware/GrpcMultiAccountManagementMiddleware.cs index 4b396dc33..0f3d995f2 100644 --- a/NineChronicles.Headless/Middleware/GrpcMultiAccountManagementMiddleware.cs +++ b/NineChronicles.Headless/Middleware/GrpcMultiAccountManagementMiddleware.cs @@ -51,25 +51,11 @@ public override async Task UnaryServerHandler( { var agent = new Address(addClientAddressBytes); var httpContext = context.GetHttpContext(); + var remoteIp = httpContext.Connection.RemoteIpAddress!.ToString(); if (_options.Value.EnableManaging) { - if (!_ipSignerList.ContainsKey(httpContext.Connection.RemoteIpAddress!.ToString())) - { - _logger.Information( - "[GRPC-MULTI-ACCOUNT-MANAGER] Creating a new list for IP: {IP}", - httpContext.Connection.RemoteIpAddress!.ToString()); - _ipSignerList[httpContext.Connection.RemoteIpAddress!.ToString()] = new HashSet
(); - } - else - { - _logger.Information( - "[GRPC-MULTI-ACCOUNT-MANAGER] List already created for IP: {IP} Count: {Count}", - httpContext.Connection.RemoteIpAddress!.ToString(), - _ipSignerList[httpContext.Connection.RemoteIpAddress!.ToString()].Count); - } - - _ipSignerList[httpContext.Connection.RemoteIpAddress!.ToString()].Add(agent); + UpdateIpSignerList(remoteIp, agent); } } @@ -77,25 +63,11 @@ public override async Task UnaryServerHandler( { var agent = new Address(getNextTxNonceAddressBytes); var httpContext = context.GetHttpContext(); + var remoteIp = httpContext.Connection.RemoteIpAddress!.ToString(); if (_options.Value.EnableManaging) { - if (!_ipSignerList.ContainsKey(httpContext.Connection.RemoteIpAddress!.ToString())) - { - _logger.Information( - "[GRPC-MULTI-ACCOUNT-MANAGER] Creating a new list for IP: {IP}", - httpContext.Connection.RemoteIpAddress!.ToString()); - _ipSignerList[httpContext.Connection.RemoteIpAddress!.ToString()] = new HashSet
(); - } - else - { - _logger.Information( - "[GRPC-MULTI-ACCOUNT-MANAGER] List already created for IP: {IP} Count: {Count}", - httpContext.Connection.RemoteIpAddress!.ToString(), - _ipSignerList[httpContext.Connection.RemoteIpAddress!.ToString()].Count); - } - - _ipSignerList[httpContext.Connection.RemoteIpAddress!.ToString()].Add(agent); + UpdateIpSignerList(remoteIp, agent); } } @@ -104,18 +76,19 @@ public override async Task UnaryServerHandler( Transaction tx = Transaction.Deserialize(txBytes); var httpContext = context.GetHttpContext(); + var remoteIp = httpContext.Connection.RemoteIpAddress!.ToString(); if (_options.Value.EnableManaging) { var agent = tx.Signer; - if (_ipSignerList[httpContext.Connection.RemoteIpAddress!.ToString()].Count > + if (_ipSignerList[remoteIp].Count > _options.Value.ThresholdCount) { _logger.Information( "[GRPC-MULTI-ACCOUNT-MANAGER] IP: {IP} List Count: {Count}, AgentAddresses: {Agent}", - httpContext.Connection.RemoteIpAddress!.ToString(), - _ipSignerList[httpContext.Connection.RemoteIpAddress!.ToString()].Count, - _ipSignerList[httpContext.Connection.RemoteIpAddress!.ToString()]); + remoteIp, + _ipSignerList[remoteIp].Count, + _ipSignerList[remoteIp]); if (!MultiAccountManagementList.ContainsKey(agent)) { if (!MultiAccountTxIntervalTracker.ContainsKey(agent)) @@ -139,7 +112,7 @@ public override async Task UnaryServerHandler( _logger.Information( $"[GRPC-MULTI-ACCOUNT-MANAGER] Managing Agent {agent} for " + $"{_options.Value.ManagementTimeMinutes} minutes due to " + - $"{_ipSignerList[httpContext.Connection.RemoteIpAddress!.ToString()].Count} associated accounts."); + $"{_ipSignerList[remoteIp].Count} associated accounts."); ManageMultiAccount(agent); MultiAccountTxIntervalTracker[agent] = DateTimeOffset.Now; throw new RpcException(new Status(StatusCode.Cancelled, "Request cancelled.")); @@ -172,5 +145,31 @@ public override async Task UnaryServerHandler( return await base.UnaryServerHandler(request, context, continuation); } + + private void UpdateIpSignerList(string ip, Address agent) + { + if (!_ipSignerList.ContainsKey(ip)) + { + _logger.Information( + "[GRPC-MULTI-ACCOUNT-MANAGER] Creating a new list for IP: {IP}", + ip); + _ipSignerList[ip] = new HashSet
(); + } + else + { + _logger.Information( + "[GRPC-MULTI-ACCOUNT-MANAGER] List already created for IP: {IP} Count: {Count}", + ip, + _ipSignerList[ip].Count); + } + + _ipSignerList[ip].Add(agent); + AddClientIpInfo(agent, ip); + } + + private void AddClientIpInfo(Address agentAddress, string ipAddress) + { + _actionEvaluationPublisher.AddClientAndIp(ipAddress, agentAddress.ToString()); + } } } diff --git a/NineChronicles.Headless/Middleware/HttpMultiAccountManagementMiddleware.cs b/NineChronicles.Headless/Middleware/HttpMultiAccountManagementMiddleware.cs index cf6588182..977c8dfb1 100644 --- a/NineChronicles.Headless/Middleware/HttpMultiAccountManagementMiddleware.cs +++ b/NineChronicles.Headless/Middleware/HttpMultiAccountManagementMiddleware.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.IO; +using System.Linq; using System.Text.RegularExpressions; using System.Threading.Tasks; using Libplanet.Common; @@ -8,6 +9,7 @@ using Libplanet.Types.Tx; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Options; +using Nekoyume.Action; using NineChronicles.Headless.Properties; using Serilog; using ILogger = Serilog.ILogger; @@ -23,18 +25,21 @@ public class HttpMultiAccountManagementMiddleware private StandaloneContext _standaloneContext; private readonly Dictionary> _ipSignerList; private readonly IOptions _options; + private ActionEvaluationPublisher _publisher; public HttpMultiAccountManagementMiddleware( RequestDelegate next, StandaloneContext standaloneContext, Dictionary> ipSignerList, - IOptions options) + IOptions options, + ActionEvaluationPublisher publisher) { _next = next; _logger = Log.Logger.ForContext(); _standaloneContext = standaloneContext; _ipSignerList = ipSignerList; _options = options; + _publisher = publisher; } private static void ManageMultiAccount(Address agent) @@ -53,7 +58,7 @@ public async Task InvokeAsync(HttpContext context) if (context.Request.Protocol == "HTTP/1.1") { context.Request.EnableBuffering(); - var remoteIp = context.Connection.RemoteIpAddress; + var remoteIp = context.Connection.RemoteIpAddress!.ToString(); var body = await new StreamReader(context.Request.Body).ReadToEndAsync(); context.Request.Body.Seek(0, SeekOrigin.Begin); if (_options.Value.EnableManaging) @@ -63,7 +68,7 @@ public async Task InvokeAsync(HttpContext context) try { var agent = new Address(body.Split("\\\"")[1].Split("0x")[1]); - UpdateIpSignerList(remoteIp!.ToString(), agent); + UpdateIpSignerList(remoteIp, agent); } catch (Exception ex) { @@ -83,63 +88,72 @@ public async Task InvokeAsync(HttpContext context) byte[] bytes = ByteUtil.ParseHex(txPayload); Transaction tx = Transaction.Deserialize(bytes); var agent = tx.Signer; - if (_ipSignerList.ContainsKey(context.Connection.RemoteIpAddress!.ToString())) + var action = NCActionUtils.ToAction(tx.Actions.Actions.First()); + + // Only monitoring actions not used in the launcher + if (action is not Stake + and not ClaimStakeReward + and not TransferAsset) { - if (_ipSignerList[context.Connection.RemoteIpAddress!.ToString()].Count > _options.Value.ThresholdCount) + if (_ipSignerList.ContainsKey(remoteIp)) { - _logger.Information( - "[GRAPHQL-MULTI-ACCOUNT-MANAGER] IP: {IP} List Count: {Count}, AgentAddresses: {Agent}", - context.Connection.RemoteIpAddress!.ToString(), - _ipSignerList[context.Connection.RemoteIpAddress!.ToString()].Count, - _ipSignerList[context.Connection.RemoteIpAddress!.ToString()]); - - if (!MultiAccountManagementList.ContainsKey(agent)) + if (_ipSignerList[remoteIp].Count > _options.Value.ThresholdCount) { - if (!MultiAccountTxIntervalTracker.ContainsKey(agent)) + _logger.Information( + "[GRAPHQL-MULTI-ACCOUNT-MANAGER] IP: {IP} List Count: {Count}, AgentAddresses: {Agent}", + remoteIp, + _ipSignerList[remoteIp].Count, + _ipSignerList[remoteIp]); + + if (!MultiAccountManagementList.ContainsKey(agent)) { - _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Adding agent {agent} to the agent tracker."); - MultiAccountTxIntervalTracker.Add(agent, DateTimeOffset.Now); + if (!MultiAccountTxIntervalTracker.ContainsKey(agent)) + { + _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Adding agent {agent} to the agent tracker."); + MultiAccountTxIntervalTracker.Add(agent, DateTimeOffset.Now); + } + else + { + if ((DateTimeOffset.Now - MultiAccountTxIntervalTracker[agent]).Minutes >= _options.Value.TxIntervalMinutes) + { + _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Resetting Agent {agent}'s time because it has been more than {_options.Value.TxIntervalMinutes} minutes since the last transaction."); + MultiAccountTxIntervalTracker[agent] = DateTimeOffset.Now; + } + else + { + _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Managing Agent {agent} for {_options.Value.ManagementTimeMinutes} minutes due to {_ipSignerList[remoteIp].Count} associated accounts."); + ManageMultiAccount(agent); + MultiAccountTxIntervalTracker[agent] = DateTimeOffset.Now; + await CancelRequestAsync(context); + return; + } + } } else { - if ((DateTimeOffset.Now - MultiAccountTxIntervalTracker[agent]).Minutes >= _options.Value.TxIntervalMinutes) + var currentManagedTime = (DateTimeOffset.Now - MultiAccountManagementList[agent]).Minutes; + if (currentManagedTime > _options.Value.ManagementTimeMinutes) { - _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Resetting Agent {agent}'s time because it has been more than {_options.Value.TxIntervalMinutes} minutes since the last transaction."); - MultiAccountTxIntervalTracker[agent] = DateTimeOffset.Now; + _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Restoring Agent {agent} after {_options.Value.ManagementTimeMinutes} minutes."); + RestoreMultiAccount(agent); + MultiAccountTxIntervalTracker[agent] = DateTimeOffset.Now.AddMinutes(-_options.Value.TxIntervalMinutes); + _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Current time: {DateTimeOffset.Now} Added time: {DateTimeOffset.Now.AddMinutes(-_options.Value.TxIntervalMinutes)}."); } else { - _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Managing Agent {agent} for {_options.Value.ManagementTimeMinutes} minutes due to {_ipSignerList[context.Connection.RemoteIpAddress!.ToString()].Count} associated accounts."); - ManageMultiAccount(agent); - MultiAccountTxIntervalTracker[agent] = DateTimeOffset.Now; + _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Agent {agent} is in managed status for the next {_options.Value.ManagementTimeMinutes - currentManagedTime} minutes."); await CancelRequestAsync(context); return; } } } - else - { - var currentManagedTime = (DateTimeOffset.Now - MultiAccountManagementList[agent]).Minutes; - if (currentManagedTime > _options.Value.ManagementTimeMinutes) - { - _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Restoring Agent {agent} after {_options.Value.ManagementTimeMinutes} minutes."); - RestoreMultiAccount(agent); - MultiAccountTxIntervalTracker[agent] = DateTimeOffset.Now.AddMinutes(-_options.Value.TxIntervalMinutes); - _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Current time: {DateTimeOffset.Now} Added time: {DateTimeOffset.Now.AddMinutes(-_options.Value.TxIntervalMinutes)}."); - } - else - { - _logger.Information($"[GRAPHQL-MULTI-ACCOUNT-MANAGER] Agent {agent} is in managed status for the next {_options.Value.ManagementTimeMinutes - currentManagedTime} minutes."); - await CancelRequestAsync(context); - return; - } - } + } + else + { + UpdateIpSignerList(remoteIp, agent); } } - else - { - UpdateIpSignerList(remoteIp!.ToString(), agent); - } + } catch (Exception ex) { @@ -155,7 +169,7 @@ public async Task InvokeAsync(HttpContext context) await _next(context); } - public void UpdateIpSignerList(string ip, Address agent) + private void UpdateIpSignerList(string ip, Address agent) { if (!_ipSignerList.ContainsKey(ip)) { @@ -173,6 +187,12 @@ public void UpdateIpSignerList(string ip, Address agent) } _ipSignerList[ip].Add(agent); + AddClientIpInfo(agent, ip); + } + + private void AddClientIpInfo(Address agentAddress, string ipAddress) + { + _publisher.AddClientAndIp(ipAddress, agentAddress.ToString()); } private async Task CancelRequestAsync(HttpContext context) diff --git a/NineChronicles.Headless/NineChroniclesNodeService.cs b/NineChronicles.Headless/NineChroniclesNodeService.cs index 155068c0e..9e075af5b 100644 --- a/NineChronicles.Headless/NineChroniclesNodeService.cs +++ b/NineChronicles.Headless/NineChroniclesNodeService.cs @@ -84,11 +84,10 @@ public NineChroniclesNodeService( Properties = properties; LogEventLevel logLevel = LogEventLevel.Debug; - var blockPolicySource = new BlockPolicySource(Log.Logger, logLevel); IStagePolicy stagePolicy = new NCStagePolicy(txLifeTime, txQuotaPerSigner); - BlockRenderer = blockPolicySource.BlockRenderer; - ActionRenderer = blockPolicySource.ActionRenderer; + BlockRenderer = new BlockRenderer(); + ActionRenderer = new ActionRenderer(); ExceptionRenderer = new ExceptionRenderer(); NodeStatusRenderer = new NodeStatusRenderer(); var renderers = new List(); @@ -101,12 +100,12 @@ public NineChroniclesNodeService( if (Properties.Render) { - renderers.Add(blockPolicySource.BlockRenderer); - renderers.Add(blockPolicySource.LoggedActionRenderer); + renderers.Add(BlockRenderer); + renderers.Add(new LoggedActionRenderer(ActionRenderer, Log.Logger, logLevel)); } else if (Properties.LogActionRenders) { - renderers.Add(blockPolicySource.BlockRenderer); + renderers.Add(BlockRenderer); // The following "nullRenderer" does nothing. It's just for filling // the LoggedActionRenderer() constructor's parameter: IActionRenderer nullRenderer = new AnonymousActionRenderer(); @@ -120,7 +119,7 @@ public NineChroniclesNodeService( } else { - renderers.Add(blockPolicySource.LoggedBlockRenderer); + renderers.Add(new LoggedRenderer(BlockRenderer, Log.Logger, logLevel)); } if (strictRendering) @@ -242,7 +241,7 @@ StandaloneContext context internal static IBlockPolicy GetBlockPolicy(NetworkType networkType, IActionLoader actionLoader) { - var source = new BlockPolicySource(Log.Logger, LogEventLevel.Debug, actionLoader); + var source = new BlockPolicySource(actionLoader); return networkType switch { NetworkType.Main => source.GetPolicy(), @@ -255,7 +254,7 @@ internal static IBlockPolicy GetBlockPolicy(NetworkType networkType, IActionLoad } internal static IBlockPolicy GetTestBlockPolicy() => - new BlockPolicySource(Log.Logger, LogEventLevel.Debug).GetTestPolicy(); + new BlockPolicySource().GetTestPolicy(); public Task CheckPeer(string addr) => NodeService?.CheckPeer(addr) ?? throw new InvalidOperationException(); diff --git a/NineChronicles.RPC.Shared b/NineChronicles.RPC.Shared index 5e02fdfea..2dcbbb19a 160000 --- a/NineChronicles.RPC.Shared +++ b/NineChronicles.RPC.Shared @@ -1 +1 @@ -Subproject commit 5e02fdfea99cb91602f43288330c10ca8e916e3e +Subproject commit 2dcbbb19a0c90f3f41de03506802bbd527c0aaba