From 19badebf10a0f1cadd8417cfc664a7cddd2b1f1b Mon Sep 17 00:00:00 2001 From: Apollo3zehn Date: Fri, 15 Mar 2024 23:29:37 +0100 Subject: [PATCH] Follow Nexus changes --- .editorconfig | 29 +- .github/workflows/build-and-publish.yml | 14 +- .gitignore | 1 + .vscode/launch.json | 21 - CHANGELOG.md | 4 + Directory.Build.props | 12 +- .../Nexus.Benchmarks/Nexus.Benchmarks.csproj | 2 +- benchmarks/Nexus.Benchmarks/PipeVsTcp.cs | 185 ++++--- benchmarks/Nexus.Benchmarks/Program.cs | 99 ++-- setup/docker/docker-compose.yml | 4 +- src/Nexus.Sources.Remote/CastMemoryManager.cs | 29 +- src/Nexus.Sources.Remote/DataSourceTypes.cs | 114 ++--- .../Nexus.Sources.Remote.csproj | 4 +- .../PropertiesExtensions.cs | 97 ++-- src/Nexus.Sources.Remote/Remote.cs | 478 +++++++++--------- .../RemoteCommunicator.cs | 440 ++++++++-------- src/remoting/dotnet-remoting/Remoting.cs | 23 +- .../dotnet-remoting/dotnet-remoting.csproj | 2 +- .../Nexus.Sources.Remote.Tests.csproj | 19 +- .../Nexus.Sources.Remote.Tests/RemoteTests.cs | 415 ++++++++------- .../SetupDockerTests.cs | 149 +++--- .../dotnet/remote.csproj | 4 +- version.json | 2 +- 23 files changed, 1051 insertions(+), 1096 deletions(-) delete mode 100644 .vscode/launch.json diff --git a/.editorconfig b/.editorconfig index 0ccff01..32c963d 100755 --- a/.editorconfig +++ b/.editorconfig @@ -1,10 +1,31 @@ -# How to format: -# (1) Add dotnet_diagnostic.XXXX.severity = error -# (2) Run dotnet-format: dotnet format --diagnostics XXXX +# How to apply single rule: +# Run dotnet format --diagnostics XXXX --severity info + +# How to apply all rules: +# Run dotnet format --severity error/info/warn/ + +[*] +trim_trailing_whitespace = true [*.cs] # "run cleanup": https://betterprogramming.pub/enforce-net-code-style-with-editorconfig-d2f0d79091ac -# TODO: build real editorconfig file: https://github.com/dotnet/roslyn/blob/main/.editorconfig +# TODO: build real editorconfig file: https://github.com/dotnet/roslyn/blob/main/.editorconfig + +# Prefer var +csharp_style_var_for_built_in_types = false +csharp_style_var_when_type_is_apparent = true +csharp_style_var_elsewhere = true +dotnet_diagnostic.IDE0007.severity = warning + +# Make field +dotnet_diagnostic.IDE0044.severity = warning + +# Use file scoped namespace declarations +dotnet_diagnostic.IDE0161.severity = error +csharp_style_namespace_declarations = file_scoped + +# Collection initialization can be simplified +dotnet_diagnostic.IDE0300.severity = warning # Enable naming rule violation errors on build (alternative: dotnet_analyzer_diagnostic.category-Style.severity = error) dotnet_diagnostic.IDE1006.severity = error diff --git a/.github/workflows/build-and-publish.yml b/.github/workflows/build-and-publish.yml index aeb9d4b..cb138a2 100644 --- a/.github/workflows/build-and-publish.yml +++ b/.github/workflows/build-and-publish.yml @@ -55,14 +55,14 @@ jobs: - name: Build run: | dotnet build -c Release /p:GeneratePackage=true src/remoting/dotnet-remoting/dotnet-remoting.csproj - python -m build --wheel --outdir artifacts/packages --no-isolation src/remoting/python-remoting + python -m build --wheel --outdir artifacts/package --no-isolation src/remoting/python-remoting - name: Test run: | dotnet test -c Release --filter TestCategory=local pyright pytest - sudo bash tests/Nexus.Sources.Remote.Tests/SetupDockerTests.sh + # sudo bash tests/Nexus.Sources.Remote.Tests/SetupDockerTests.sh - name: Upload Artifacts uses: actions/upload-artifact@v3 @@ -70,7 +70,7 @@ jobs: name: artifacts path: | artifacts/tag_body.txt - artifacts/packages/ + artifacts/package/release/ outputs: is_release: ${{ env.IS_RELEASE }} @@ -94,13 +94,13 @@ jobs: # GitHub Package Registry is broken by design: https://github.community/t/download-from-github-package-registry-without-authentication/14407/138 - name: Nuget package (MyGet) - run: dotnet nuget push 'artifacts/packages/*.nupkg' --api-key ${MYGET_API_KEY} --source https://www.myget.org/F/apollo3zehn-dev/api/v3/index.json + run: dotnet nuget push 'artifacts/package/release/*.nupkg' --api-key ${MYGET_API_KEY} --source https://www.myget.org/F/apollo3zehn-dev/api/v3/index.json env: MYGET_API_KEY: ${{ secrets.MYGET_API_KEY }} # GitHub Package Registry does not support Python packages: https://github.community/t/pypi-compatible-github-package-registry/14615 - name: Python package (MyGet) - run: 'for filePath in artifacts/packages/*.whl; do curl -k -X POST https://www.myget.org/F/apollo3zehn-dev/python/upload -H "Authorization: Bearer ${MYGET_API_KEY}" -F "data=@$filePath"; done' + run: 'for filePath in artifacts/package/*.whl; do curl -k -X POST https://www.myget.org/F/apollo3zehn-dev/python/upload -H "Authorization: Bearer ${MYGET_API_KEY}" -F "data=@$filePath"; done' env: MYGET_API_KEY: ${{ secrets.MYGET_API_KEY }} @@ -130,11 +130,11 @@ jobs: body_path: artifacts/tag_body.txt - name: Nuget package (Nuget) - run: dotnet nuget push 'artifacts/packages/*.nupkg' --api-key ${NUGET_API_KEY} --source https://api.nuget.org/v3/index.json + run: dotnet nuget push 'artifacts/package/release/*.nupkg' --api-key ${NUGET_API_KEY} --source https://api.nuget.org/v3/index.json env: NUGET_API_KEY: ${{ secrets.NUGET_API_KEY }} - name: Python Package (PyPI) - run: twine upload artifacts/packages/*.whl -u__token__ -p"${PYPI_API_KEY}" + run: twine upload artifacts/package/*.whl -u__token__ -p"${PYPI_API_KEY}" env: PYPI_API_KEY: ${{ secrets.PYPI_API_KEY }} \ No newline at end of file diff --git a/.gitignore b/.gitignore index 4339ec8..4bbc1fa 100755 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .vs/ .venv/ + artifacts/ BenchmarkDotNet.Artifacts diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index 64627f9..0000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "version": "0.2.0", - "configurations": [ - { - "name": ".NET Core Launch (console)", - "type": "coreclr", - "request": "launch", - "preLaunchTask": "build", - "program": "${workspaceFolder}/src/Nexus.Sources.Remote/bin/Debug/net7.0/Nexus.Sources.Remote.dll", - "args": [], - "cwd": "${workspaceFolder}/src/Nexus.Sources.Remote", - "console": "internalConsole", - "stopAtEntry": false - }, - { - "name": ".NET Core Attach", - "type": "coreclr", - "request": "attach" - } - ] -} \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 473f34d..176b3e0 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## v2.0.0-beta.24 - 2024-03-15 + +- Follow Nexus changes. + ## v2.0.0-beta.2 - 2023-09-26 Follow Nexus changes. diff --git a/Directory.Build.props b/Directory.Build.props index 1211bf3..db678da 100755 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -2,7 +2,7 @@ true - net7.0 + net8.0 enable enable true @@ -10,14 +10,8 @@ https://www.myget.org/F/apollo3zehn-dev/api/v3/index.json - - - - $([MSBuild]::NormalizePath($(MSBuildThisFileDirectory)artifacts)) - $(ArtifactsPath)/obj/$(MSBuildProjectName) - $(BaseIntermediateOutputPath)/$(Configuration) - $(ArtifactsPath)/bin/$(MSBuildProjectName)/$(Configuration) - $(ArtifactsPath)/packages + true + $(MSBuildThisFileDirectory)artifacts \ No newline at end of file diff --git a/benchmarks/Nexus.Benchmarks/Nexus.Benchmarks.csproj b/benchmarks/Nexus.Benchmarks/Nexus.Benchmarks.csproj index dcf68b0..9857d80 100644 --- a/benchmarks/Nexus.Benchmarks/Nexus.Benchmarks.csproj +++ b/benchmarks/Nexus.Benchmarks/Nexus.Benchmarks.csproj @@ -6,7 +6,7 @@ - + diff --git a/benchmarks/Nexus.Benchmarks/PipeVsTcp.cs b/benchmarks/Nexus.Benchmarks/PipeVsTcp.cs index 06c5e5e..53689fa 100644 --- a/benchmarks/Nexus.Benchmarks/PipeVsTcp.cs +++ b/benchmarks/Nexus.Benchmarks/PipeVsTcp.cs @@ -6,117 +6,116 @@ using System.Reflection; using System.Runtime.InteropServices; -namespace Nexus.Benchmarks +namespace Nexus.Benchmarks; + +[MemoryDiagnoser] +public class PipeVsTcp { - [MemoryDiagnoser] - public class PipeVsTcp + private Process _pipeProcess = default!; + private Process _tcpProcess = default!; + private TcpClient _tcpClient = default!; + + [GlobalSetup] + public void GlobalSetup() { - private Process _pipeProcess = default!; - private Process _tcpProcess = default!; - private TcpClient _tcpClient = default!; + // assembly path + var assemblyPath = Assembly.GetExecutingAssembly().Location; - [GlobalSetup] - public void GlobalSetup() + // run pipe process + var pipePsi = new ProcessStartInfo("dotnet") { - // assembly path - var assemblyPath = Assembly.GetExecutingAssembly().Location; - - // run pipe process - var pipePsi = new ProcessStartInfo("dotnet") - { - Arguments = $"{assemblyPath} pipe", - UseShellExecute = false, - RedirectStandardInput = true, - RedirectStandardOutput = true, - RedirectStandardError = true - }; - - _pipeProcess = new Process() { StartInfo = pipePsi }; - _pipeProcess.Start(); - - // run tcp process - var tcpPsi = new ProcessStartInfo("dotnet") - { - Arguments = $"{assemblyPath} tcp", - UseShellExecute = false, - RedirectStandardInput = true, - RedirectStandardOutput = true, - RedirectStandardError = true - }; - - _tcpProcess = new Process() { StartInfo = tcpPsi }; - _tcpProcess.Start(); - - var tcpListener = new TcpListener(IPAddress.Parse("127.0.0.1"), 55555); - tcpListener.Start(); - - _tcpClient = tcpListener.AcceptTcpClient(); - _tcpClient.NoDelay = true; - } - - [GlobalCleanup] - public void GlobalCleanup() + Arguments = $"{assemblyPath} pipe", + UseShellExecute = false, + RedirectStandardInput = true, + RedirectStandardOutput = true, + RedirectStandardError = true + }; + + _pipeProcess = new Process() { StartInfo = pipePsi }; + _pipeProcess.Start(); + + // run tcp process + var tcpPsi = new ProcessStartInfo("dotnet") { - _pipeProcess?.Kill(); - _tcpProcess?.Kill(); - } + Arguments = $"{assemblyPath} tcp", + UseShellExecute = false, + RedirectStandardInput = true, + RedirectStandardOutput = true, + RedirectStandardError = true + }; - [Params(Program.MIN_LENGTH, 100, 10_000, 1_000_000, Program.MAX_LENGTH)] - public int N; + _tcpProcess = new Process() { StartInfo = tcpPsi }; + _tcpProcess.Start(); - [Benchmark(Baseline = true)] - public Memory Pipe() - { - var pipeReadStream = _pipeProcess.StandardOutput.BaseStream; - var pipeWriteStream = _pipeProcess.StandardInput.BaseStream; - using var owner = MemoryPool.Shared.Rent(N * sizeof(int)); + var tcpListener = new TcpListener(IPAddress.Parse("127.0.0.1"), 55555); + tcpListener.Start(); - return ReadFromStream(pipeReadStream, pipeWriteStream, owner.Memory); - } + _tcpClient = tcpListener.AcceptTcpClient(); + _tcpClient.NoDelay = true; + } - [Benchmark()] - public Memory Tcp() - { - var tcpReadStream = _tcpClient.GetStream(); - var pipeWriteStream = _tcpProcess.StandardInput.BaseStream; - using var owner = MemoryPool.Shared.Rent(N * sizeof(int)); + [GlobalCleanup] + public void GlobalCleanup() + { + _pipeProcess?.Kill(); + _tcpProcess?.Kill(); + } - return ReadFromStream(tcpReadStream, pipeWriteStream, owner.Memory); - } + [Params(Program.MIN_LENGTH, 100, 10_000, 1_000_000, Program.MAX_LENGTH)] + public int N; - private Memory ReadFromStream(Stream readStream, Stream writeStream, Memory buffer) - { - // trigger - var Nbuffer = BitConverter.GetBytes(N); - writeStream.Write(Nbuffer); - writeStream.Flush(); + [Benchmark(Baseline = true)] + public Memory Pipe() + { + var pipeReadStream = _pipeProcess.StandardOutput.BaseStream; + var pipeWriteStream = _pipeProcess.StandardInput.BaseStream; + using var owner = MemoryPool.Shared.Rent(N * sizeof(int)); + + return ReadFromStream(pipeReadStream, pipeWriteStream, owner.Memory); + } + + [Benchmark()] + public Memory Tcp() + { + var tcpReadStream = _tcpClient.GetStream(); + var pipeWriteStream = _tcpProcess.StandardInput.BaseStream; + using var owner = MemoryPool.Shared.Rent(N * sizeof(int)); + + return ReadFromStream(tcpReadStream, pipeWriteStream, owner.Memory); + } - // receive data - var remaining = N * sizeof(int); - var offset = 0; + private Memory ReadFromStream(Stream readStream, Stream writeStream, Memory buffer) + { + // trigger + var Nbuffer = BitConverter.GetBytes(N); + writeStream.Write(Nbuffer); + writeStream.Flush(); - while (remaining > 0) - { - var span = buffer.Slice(offset, remaining).Span; - var readBytes = readStream.Read(span); + // receive data + var remaining = N * sizeof(int); + var offset = 0; - if (readBytes == 0) - throw new Exception("The child process terminated early."); + while (remaining > 0) + { + var span = buffer.Slice(offset, remaining).Span; + var readBytes = readStream.Read(span); - remaining -= readBytes; - offset += readBytes; - } + if (readBytes == 0) + throw new Exception("The child process terminated early."); - var intBuffer = MemoryMarshal.Cast(buffer.Span); + remaining -= readBytes; + offset += readBytes; + } - // validate first 3 values - for (int i = 0; i < Math.Min(N, 3); i++) - { - if (intBuffer[i] != i) - throw new Exception($"Invalid data received. Data is {intBuffer[i]}, index = {i}."); - } + var intBuffer = MemoryMarshal.Cast(buffer.Span); - return buffer; + // validate first 3 values + for (int i = 0; i < Math.Min(N, 3); i++) + { + if (intBuffer[i] != i) + throw new Exception($"Invalid data received. Data is {intBuffer[i]}, index = {i}."); } + + return buffer; } } diff --git a/benchmarks/Nexus.Benchmarks/Program.cs b/benchmarks/Nexus.Benchmarks/Program.cs index 216ead0..3106f24 100644 --- a/benchmarks/Nexus.Benchmarks/Program.cs +++ b/benchmarks/Nexus.Benchmarks/Program.cs @@ -3,72 +3,71 @@ using System.Runtime.InteropServices; // results are here: https://stackoverflow.com/questions/68347138/ipc-performance-anonymous-pipe-vs-socket/68347139#68347139 -namespace Nexus.Benchmarks +namespace Nexus.Benchmarks; + +public class Program { - public class Program - { - public const int MIN_LENGTH = 1; - public const int MAX_LENGTH = 10_000_000; + public const int MIN_LENGTH = 1; + public const int MAX_LENGTH = 10_000_000; - static void Main(string[] args) + static void Main(string[] args) + { + if (args.Length == 0) { - if (!args.Any()) - { - var summary = BenchmarkRunner.Run(); - } - else - { - var data = MemoryMarshal - .AsBytes( - Enumerable - .Range(0, MAX_LENGTH) - .ToArray()) - .ToArray(); + _ = BenchmarkRunner.Run(); + } + else + { + var data = MemoryMarshal + .AsBytes( + Enumerable + .Range(0, MAX_LENGTH) + .ToArray()) + .ToArray(); - using var readStream = Console.OpenStandardInput(); + using var readStream = Console.OpenStandardInput(); - if (args[0] == "pipe") - { - using var pipeStream = Console.OpenStandardOutput(); - RunChildProcess(readStream, pipeStream, data); - } + if (args[0] == "pipe") + { + using var pipeStream = Console.OpenStandardOutput(); + RunChildProcess(readStream, pipeStream, data); + } - else if (args[0] == "tcp") + else if (args[0] == "tcp") + { + var tcpClient = new TcpClient() { - var tcpClient = new TcpClient() - { - NoDelay = true - }; + NoDelay = true + }; - tcpClient.Connect("localhost", 55555); - var tcpStream = tcpClient.GetStream(); - RunChildProcess(readStream, tcpStream, data); - } + tcpClient.Connect("localhost", 55555); + var tcpStream = tcpClient.GetStream(); + RunChildProcess(readStream, tcpStream, data); + } - else - { - throw new Exception("Invalid argument (args[0])."); - } + else + { + throw new Exception("Invalid argument (args[0])."); } } + } - static void RunChildProcess(Stream readStream, Stream writeStream, byte[] data) - { - // wait for start signal - Span buffer = stackalloc byte[4]; + static void RunChildProcess(Stream readStream, Stream writeStream, byte[] data) + { + // wait for start signal + Span buffer = stackalloc byte[4]; - while (true) - { - var length = readStream.Read(buffer); + while (true) + { + var length = readStream.Read(buffer); - if (length == 0) - throw new Exception($"The host process terminated early."); + if (length == 0) + throw new Exception($"The host process terminated early."); - var N = BitConverter.ToInt32(buffer); + var N = BitConverter.ToInt32(buffer); - // write - writeStream.Write(data, 0, N * sizeof(int)); - } + // write + writeStream.Write(data, 0, N * sizeof(int)); } } } diff --git a/setup/docker/docker-compose.yml b/setup/docker/docker-compose.yml index 4d35736..b27554f 100644 --- a/setup/docker/docker-compose.yml +++ b/setup/docker/docker-compose.yml @@ -4,7 +4,7 @@ services: main: container_name: nexus-main - image: mcr.microsoft.com/dotnet/sdk:7.0 + image: mcr.microsoft.com/dotnet/sdk:8.0 volumes: - /var/lib/nexus/docker/nexus-main/.ssh:/root/.ssh entrypoint: bash -c "cd; curl -s -O 'https://raw.githubusercontent.com/malstroem-labs/nexus-sources-remote/master/setup/docker/setup-main.sh'; source 'setup-main.sh'" @@ -20,7 +20,7 @@ services: satellite-nexus: container_name: nexus-dotnet - image: mcr.microsoft.com/dotnet/sdk:7.0 + image: mcr.microsoft.com/dotnet/sdk:8.0 volumes: - /var/lib/nexus/docker/nexus-dotnet/.ssh:/root/.ssh entrypoint: bash -c "satellite_id=dotnet;cd; curl -s -O 'https://raw.githubusercontent.com/malstroem-labs/nexus-sources-remote/master/setup/docker/setup-satellite.sh'; source 'setup-satellite.sh'" diff --git a/src/Nexus.Sources.Remote/CastMemoryManager.cs b/src/Nexus.Sources.Remote/CastMemoryManager.cs index bc0398e..2da8e04 100644 --- a/src/Nexus.Sources.Remote/CastMemoryManager.cs +++ b/src/Nexus.Sources.Remote/CastMemoryManager.cs @@ -1,25 +1,22 @@ using System.Buffers; using System.Runtime.InteropServices; -namespace Nexus.Sources -{ - internal class CastMemoryManager : MemoryManager - where TFrom : struct - where TTo : struct - { - private readonly Memory _from; +namespace Nexus.Sources; - public CastMemoryManager(Memory from) => _from = from; +internal class CastMemoryManager(Memory from) : MemoryManager + where TFrom : struct + where TTo : struct +{ + private readonly Memory _from = from; - public override Span GetSpan() => MemoryMarshal.Cast(_from.Span); + public override Span GetSpan() => MemoryMarshal.Cast(_from.Span); - protected override void Dispose(bool disposing) - { - // - } + protected override void Dispose(bool disposing) + { + // + } - public override MemoryHandle Pin(int elementIndex = 0) => throw new NotSupportedException("CastMemoryManager does not support pinning."); + public override MemoryHandle Pin(int elementIndex = 0) => throw new NotSupportedException("CastMemoryManager does not support pinning."); - public override void Unpin() => throw new NotSupportedException("CastMemoryManager does not support unpinning."); - } + public override void Unpin() => throw new NotSupportedException("CastMemoryManager does not support unpinning."); } diff --git a/src/Nexus.Sources.Remote/DataSourceTypes.cs b/src/Nexus.Sources.Remote/DataSourceTypes.cs index ec20ae4..51a7a8a 100644 --- a/src/Nexus.Sources.Remote/DataSourceTypes.cs +++ b/src/Nexus.Sources.Remote/DataSourceTypes.cs @@ -4,78 +4,72 @@ using Nexus.Extensibility; using System.Text.Json; -namespace Nexus.Sources +namespace Nexus.Sources; + +internal interface IJsonRpcServer { - internal interface IJsonRpcServer - { - public Task - GetApiVersionAsync(CancellationToken cancellationToken); + public Task + GetApiVersionAsync(CancellationToken cancellationToken); - public Task - SetContextAsync(DataSourceContext context, CancellationToken cancellationToken); + public Task + SetContextAsync(DataSourceContext context, CancellationToken cancellationToken); - public Task - GetCatalogRegistrationsAsync(string path, CancellationToken cancellationToken); + public Task + GetCatalogRegistrationsAsync(string path, CancellationToken cancellationToken); - public Task - GetCatalogAsync(string catalogId, CancellationToken cancellationToken); + public Task + GetCatalogAsync(string catalogId, CancellationToken cancellationToken); - public Task - GetTimeRangeAsync(string catalogId, CancellationToken cancellationToken); + public Task + GetTimeRangeAsync(string catalogId, CancellationToken cancellationToken); - public Task - GetAvailabilityAsync(string catalogId, DateTime begin, DateTime end, CancellationToken cancellationToken); + public Task + GetAvailabilityAsync(string catalogId, DateTime begin, DateTime end, CancellationToken cancellationToken); - public Task - ReadSingleAsync(DateTime begin, DateTime end, CatalogItem catalogItem, CancellationToken cancellationToken); - } + public Task + ReadSingleAsync(DateTime begin, DateTime end, CatalogItem catalogItem, CancellationToken cancellationToken); +} - internal record ApiVersionResponse(int ApiVersion); - internal record CatalogRegistrationsResponse(CatalogRegistration[] Registrations); - internal record CatalogResponse(ResourceCatalog Catalog); - internal record TimeRangeResponse(DateTime Begin, DateTime End); - internal record AvailabilityResponse(double Availability); - internal record LogMessage(LogLevel LogLevel, string Message); +internal record ApiVersionResponse(int ApiVersion); +internal record CatalogRegistrationsResponse(CatalogRegistration[] Registrations); +internal record CatalogResponse(ResourceCatalog Catalog); +internal record TimeRangeResponse(DateTime Begin, DateTime End); +internal record AvailabilityResponse(double Availability); +internal record LogMessage(LogLevel LogLevel, string Message); - internal class RemoteException : Exception +internal class RemoteException(string message, Exception? innerException = default) : Exception(message, innerException) +{ +} + +internal class JsonElementConverter : Newtonsoft.Json.JsonConverter +{ + internal static JsonSerializerOptions _serializerOptions = new() { - public RemoteException(string message, Exception? innerException = default) - : base(message, innerException) - { - // - } + PropertyNamingPolicy = JsonNamingPolicy.CamelCase + }; + + public override bool CanConvert(Type objectType) + { + var canConvert = objectType == typeof(JsonElement); + return canConvert; + } + + public override object? ReadJson(Newtonsoft.Json.JsonReader reader, Type objectType, object? existingValue, Newtonsoft.Json.JsonSerializer serializer) + { + if (reader.TokenType == Newtonsoft.Json.JsonToken.Null) + return default; + + if (reader.TokenType == Newtonsoft.Json.JsonToken.String) + return JsonSerializer.SerializeToElement(JToken.Load(reader).ToString()); + + var serialized_tmp = JToken.Load(reader).ToString(); + var deserialized = JsonSerializer.Deserialize(serialized_tmp); + return deserialized; } - internal class JsonElementConverter : Newtonsoft.Json.JsonConverter + public override void WriteJson(Newtonsoft.Json.JsonWriter writer, object? value, Newtonsoft.Json.JsonSerializer serializer) { - internal static JsonSerializerOptions _serializerOptions = new() - { - PropertyNamingPolicy = JsonNamingPolicy.CamelCase - }; - - public override bool CanConvert(Type objectType) - { - var canConvert = objectType == typeof(JsonElement); - return canConvert; - } - - public override object? ReadJson(Newtonsoft.Json.JsonReader reader, Type objectType, object? existingValue, Newtonsoft.Json.JsonSerializer serializer) - { - if (reader.TokenType == Newtonsoft.Json.JsonToken.Null) - return default; - - if (reader.TokenType == Newtonsoft.Json.JsonToken.String) - return JsonSerializer.SerializeToElement(JToken.Load(reader).ToString()); - - var serialized_tmp = JToken.Load(reader).ToString(); - var deserialized = JsonSerializer.Deserialize(serialized_tmp); - return deserialized; - } - - public override void WriteJson(Newtonsoft.Json.JsonWriter writer, object? value, Newtonsoft.Json.JsonSerializer serializer) - { - var jsonString = JsonSerializer.Serialize(value, _serializerOptions); - writer.WriteRawValue(jsonString); - } + var jsonString = JsonSerializer.Serialize(value, _serializerOptions); + writer.WriteRawValue(jsonString); } } diff --git a/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj b/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj index f0d290f..f3b3844 100644 --- a/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj +++ b/src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj @@ -11,11 +11,11 @@ - + runtime;native - + diff --git a/src/Nexus.Sources.Remote/PropertiesExtensions.cs b/src/Nexus.Sources.Remote/PropertiesExtensions.cs index 10b713f..497b7bb 100644 --- a/src/Nexus.Sources.Remote/PropertiesExtensions.cs +++ b/src/Nexus.Sources.Remote/PropertiesExtensions.cs @@ -1,71 +1,70 @@ using System.Text.Json; -namespace Nexus.Sources +namespace Nexus.Sources; + +internal static class PropertiesExtensions { - internal static class PropertiesExtensions + public static int? GetIntValue(this IReadOnlyDictionary properties, string propertyPath) { - public static int? GetIntValue(this IReadOnlyDictionary properties, string propertyPath) + var pathSegments = propertyPath.Split('/').AsSpan(); + + if (properties.TryGetValue(pathSegments[0], out var element)) { - var pathSegments = propertyPath.Split('/').AsSpan(); + pathSegments = pathSegments[1..]; - if (properties.TryGetValue(pathSegments[0], out var element)) + if (pathSegments.Length == 0) { - pathSegments = pathSegments[1..]; - - if (pathSegments.Length == 0) - { - if (element.ValueKind == JsonValueKind.Number) - return element.GetInt32(); - } - - else - { - var newPropertyPath = string.Join('/', pathSegments.ToArray()); - return element.GetIntValue(newPropertyPath); - } + if (element.ValueKind == JsonValueKind.Number) + return element.GetInt32(); } - return default; + else + { + var newPropertyPath = string.Join('/', pathSegments.ToArray()); + return element.GetIntValue(newPropertyPath); + } } - public static int? GetIntValue(this JsonElement properties, string propertyPath) - { - var pathSegments = propertyPath.Split('/').AsSpan(); - var root = properties.GetJsonObjectFromPath(pathSegments[0..^1]); + return default; + } - var propertyName = pathSegments.Length == 0 - ? propertyPath - : pathSegments[^1]; + public static int? GetIntValue(this JsonElement properties, string propertyPath) + { + var pathSegments = propertyPath.Split('/').AsSpan(); + var root = properties.GetJsonObjectFromPath(pathSegments[0..^1]); - if (root.ValueKind == JsonValueKind.Object && - root.TryGetProperty(propertyName, out var propertyValue) && - (propertyValue.ValueKind == JsonValueKind.Number)) - return propertyValue.GetInt32(); + var propertyName = pathSegments.Length == 0 + ? propertyPath + : pathSegments[^1]; - return default; - } + if (root.ValueKind == JsonValueKind.Object && + root.TryGetProperty(propertyName, out var propertyValue) && + (propertyValue.ValueKind == JsonValueKind.Number)) + return propertyValue.GetInt32(); - private static JsonElement GetJsonObjectFromPath(this JsonElement root, Span pathSegements) - { - if (pathSegements.Length == 0) - return root; + return default; + } - var current = root; + private static JsonElement GetJsonObjectFromPath(this JsonElement root, Span pathSegements) + { + if (pathSegements.Length == 0) + return root; - foreach (var pathSegement in pathSegements) + var current = root; + + foreach (var pathSegement in pathSegements) + { + if (current.ValueKind == JsonValueKind.Object && + current.TryGetProperty(pathSegement, out current)) { - if (current.ValueKind == JsonValueKind.Object && - current.TryGetProperty(pathSegement, out current)) - { - // do nothing - } - else - { - return default; - } + // do nothing + } + else + { + return default; } - - return current; } + + return current; } } \ No newline at end of file diff --git a/src/Nexus.Sources.Remote/Remote.cs b/src/Nexus.Sources.Remote/Remote.cs index 2745c88..3481034 100644 --- a/src/Nexus.Sources.Remote/Remote.cs +++ b/src/Nexus.Sources.Remote/Remote.cs @@ -4,327 +4,313 @@ using System.Buffers; using System.Net; using System.Reflection; -using System.Runtime.InteropServices; using System.Text.Json; using System.Text.RegularExpressions; -namespace Nexus.Sources +namespace Nexus.Sources; + +[ExtensionDescription( + "Provides access to remote databases", + "https://github.com/malstroem-labs/nexus-sources-remote", + "https://github.com/malstroem-labs/nexus-sources-remote")] +public partial class Remote : IDataSource, IDisposable { - [ExtensionDescription( - "Provides access to remote databases", - "https://github.com/malstroem-labs/nexus-sources-remote", - "https://github.com/malstroem-labs/nexus-sources-remote")] - public partial class Remote : IDataSource, IDisposable + #region Fields + + private ReadDataHandler? _readData; + private static readonly int API_LEVEL = 1; + private RemoteCommunicator _communicator = default!; + private IJsonRpcServer _rpcServer = default!; + + #endregion + + #region Properties + + /* Possible features to be implemented for this data source: + * + * Transports: + * - anonymous pipes (done) + * - named pipes client + * - tcp client + * - shared memory + * - ... + * + * Protocols: + * - JsonRpc + binary data stream (done) + * - 0mq + * - messagepack + * - gRPC + * - ... + */ + + private DataSourceContext Context { get; set; } = default!; + + #endregion + + #region Methods + + public async Task SetContextAsync( + DataSourceContext context, + ILogger logger, + CancellationToken cancellationToken) { - #region Fields - - private ReadDataHandler? _readData; - private static readonly int API_LEVEL = 1; - private RemoteCommunicator _communicator = default!; - private IJsonRpcServer _rpcServer = default!; - - #endregion - - #region Properties - - /* Possible features to be implemented for this data source: - * - * Transports: - * - anonymous pipes (done) - * - named pipes client - * - tcp client - * - shared memory - * - ... - * - * Protocols: - * - JsonRpc + binary data stream (done) - * - 0mq - * - messagepack - * - gRPC - * - ... - */ - - private DataSourceContext Context { get; set; } = default!; - - #endregion - - #region Methods - - public async Task SetContextAsync( - DataSourceContext context, - ILogger logger, - CancellationToken cancellationToken) - { - Context = context; + Context = context; - // mode - var mode = Context.SourceConfiguration?.GetStringValue("mode") ?? "tcp"; + // mode + var mode = Context.SourceConfiguration?.GetStringValue("mode") ?? "tcp"; - if (mode != "tcp") - throw new NotSupportedException($"The mode {mode} is not supported."); + if (mode != "tcp") + throw new NotSupportedException($"The mode {mode} is not supported."); - // listen-address - var listenAddressString = Context.SourceConfiguration?.GetStringValue("listen-address") ?? "0.0.0.0"; + // listen-address + var listenAddressString = Context.SourceConfiguration?.GetStringValue("listen-address") ?? "0.0.0.0"; - if (!IPAddress.TryParse(listenAddressString, out var listenAddress)) - throw new ArgumentException("The listen-address parameter is not a valid IP-Address."); + if (!IPAddress.TryParse(listenAddressString, out var listenAddress)) + throw new ArgumentException("The listen-address parameter is not a valid IP-Address."); - // listen-port - var listenPortMin = Context.SourceConfiguration?.GetIntValue("listen-port-min") ?? 49152; + // listen-port + var listenPortMin = Context.SourceConfiguration?.GetIntValue("listen-port-min") ?? 49152; - if (!(1 <= listenPortMin && listenPortMin < 65536)) - throw new ArgumentException("The listen-port-min parameter is invalid."); + if (!(1 <= listenPortMin && listenPortMin < 65536)) + throw new ArgumentException("The listen-port-min parameter is invalid."); - var listenPortMax = Context.SourceConfiguration?.GetIntValue("listen-port-max") ?? 65536; + var listenPortMax = Context.SourceConfiguration?.GetIntValue("listen-port-max") ?? 65536; - if (!(1 <= listenPortMin && listenPortMin < 65536)) - throw new ArgumentException("The listen-port-max parameter is invalid."); + if (!(1 <= listenPortMin && listenPortMin < 65536)) + throw new ArgumentException("The listen-port-max parameter is invalid."); - // template - var templateId = Context.SourceConfiguration?.GetStringValue("template"); + // template + var templateId = (Context.SourceConfiguration?.GetStringValue("template")) ?? throw new KeyNotFoundException("The template parameter must be provided."); - if (templateId is null) - throw new KeyNotFoundException("The template parameter must be provided."); + // environment variables + var requestConfiguration = Context.SourceConfiguration!; + var environmentVariables = new Dictionary(); - // environment variables - var requestConfiguration = Context.SourceConfiguration!; - var environmentVariables = new Dictionary(); + if (requestConfiguration.TryGetValue("environment-variables", out var propertyValue) && + propertyValue.ValueKind == JsonValueKind.Object) + { + var environmentVariablesRaw = propertyValue.Deserialize>(); - if (requestConfiguration.TryGetValue("environment-variables", out var propertyValue) && - propertyValue.ValueKind == JsonValueKind.Object) - { - var environmentVariablesRaw = propertyValue.Deserialize>(); - - if (environmentVariablesRaw is not null) - environmentVariables = environmentVariablesRaw - .Where(entry => entry.Value.ValueKind == JsonValueKind.String) - .ToDictionary(entry => entry.Key, entry => entry.Value.GetString() ?? ""); - } + if (environmentVariablesRaw is not null) + environmentVariables = environmentVariablesRaw + .Where(entry => entry.Value.ValueKind == JsonValueKind.String) + .ToDictionary(entry => entry.Key, entry => entry.Value.GetString() ?? ""); + } - // Build command - var actualCommand = BuildCommand(templateId); + // Build command + var actualCommand = BuildCommand(templateId); - // - var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); + // + var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); - _communicator = new RemoteCommunicator( - actualCommand, - environmentVariables, - listenAddress, - listenPortMin, - listenPortMax, - HandleReadDataAsync, - logger); + _communicator = new RemoteCommunicator( + actualCommand, + environmentVariables, + listenAddress, + listenPortMin, + listenPortMax, + HandleReadDataAsync, + logger); - _rpcServer = await _communicator.ConnectAsync(timeoutTokenSource.Token); + _rpcServer = await _communicator.ConnectAsync(timeoutTokenSource.Token); - var apiVersion = (await _rpcServer.GetApiVersionAsync(timeoutTokenSource.Token)).ApiVersion; + var apiVersion = (await _rpcServer.GetApiVersionAsync(timeoutTokenSource.Token)).ApiVersion; - if (apiVersion < 1 || apiVersion > API_LEVEL) - throw new Exception($"The API level '{apiVersion}' is not supported."); + if (apiVersion < 1 || apiVersion > API_LEVEL) + throw new Exception($"The API level '{apiVersion}' is not supported."); - logger.LogTrace("Set context to remote client"); + logger.LogTrace("Set context to remote client"); - await _rpcServer - .SetContextAsync(context, timeoutTokenSource.Token); + await _rpcServer + .SetContextAsync(context, timeoutTokenSource.Token); - logger.LogDebug("Done preparing remote client"); - } + logger.LogDebug("Done preparing remote client"); + } - public async Task GetCatalogRegistrationsAsync( - string path, - CancellationToken cancellationToken) - { - var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); - cancellationToken.Register(() => timeoutTokenSource.Cancel()); + public async Task GetCatalogRegistrationsAsync( + string path, + CancellationToken cancellationToken) + { + var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); + cancellationToken.Register(() => timeoutTokenSource.Cancel()); - var response = await _rpcServer - .GetCatalogRegistrationsAsync(path, timeoutTokenSource.Token); + var response = await _rpcServer + .GetCatalogRegistrationsAsync(path, timeoutTokenSource.Token); - return response.Registrations; - } + return response.Registrations; + } - public async Task GetCatalogAsync( - string catalogId, - CancellationToken cancellationToken) - { - var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); - cancellationToken.Register(() => timeoutTokenSource.Cancel()); + public async Task GetCatalogAsync( + string catalogId, + CancellationToken cancellationToken) + { + var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); + cancellationToken.Register(() => timeoutTokenSource.Cancel()); - var response = await _rpcServer - .GetCatalogAsync(catalogId, timeoutTokenSource.Token); + var response = await _rpcServer + .GetCatalogAsync(catalogId, timeoutTokenSource.Token); - return response.Catalog; - } + return response.Catalog; + } - public async Task<(DateTime Begin, DateTime End)> GetTimeRangeAsync( - string catalogId, - CancellationToken cancellationToken) - { - var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); - cancellationToken.Register(() => timeoutTokenSource.Cancel()); + public async Task<(DateTime Begin, DateTime End)> GetTimeRangeAsync( + string catalogId, + CancellationToken cancellationToken) + { + var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); + cancellationToken.Register(() => timeoutTokenSource.Cancel()); - var response = await _rpcServer - .GetTimeRangeAsync(catalogId, timeoutTokenSource.Token); + var response = await _rpcServer + .GetTimeRangeAsync(catalogId, timeoutTokenSource.Token); - var begin = response.Begin.ToUniversalTime(); - var end = response.End.ToUniversalTime(); + var begin = response.Begin.ToUniversalTime(); + var end = response.End.ToUniversalTime(); - return (begin, end); - } + return (begin, end); + } - public async Task GetAvailabilityAsync( - string catalogId, - DateTime begin, - DateTime end, - CancellationToken cancellationToken) - { - var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); - cancellationToken.Register(() => timeoutTokenSource.Cancel()); + public async Task GetAvailabilityAsync( + string catalogId, + DateTime begin, + DateTime end, + CancellationToken cancellationToken) + { + var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); + cancellationToken.Register(() => timeoutTokenSource.Cancel()); - var response = await _rpcServer - .GetAvailabilityAsync(catalogId, begin, end, timeoutTokenSource.Token); + var response = await _rpcServer + .GetAvailabilityAsync(catalogId, begin, end, timeoutTokenSource.Token); - return response.Availability; - } + return response.Availability; + } + + public async Task ReadAsync( + DateTime begin, + DateTime end, + ReadRequest[] requests, + ReadDataHandler readData, + IProgress progress, + CancellationToken cancellationToken) + { + _readData = readData; - public async Task ReadAsync( - DateTime begin, - DateTime end, - ReadRequest[] requests, - ReadDataHandler readData, - IProgress progress, - CancellationToken cancellationToken) + try { - _readData = readData; + var counter = 0.0; - try + foreach (var (catalogItem, data, status) in requests) { - var counter = 0.0; + cancellationToken.ThrowIfCancellationRequested(); - foreach (var (catalogItem, data, status) in requests) - { - cancellationToken.ThrowIfCancellationRequested(); + var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); + cancellationToken.Register(() => timeoutTokenSource.Cancel()); - var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); - cancellationToken.Register(() => timeoutTokenSource.Cancel()); + var elementCount = data.Length / catalogItem.Representation.ElementSize; - var elementCount = data.Length / catalogItem.Representation.ElementSize; + await _rpcServer + .ReadSingleAsync(begin, end, catalogItem, timeoutTokenSource.Token); - await _rpcServer - .ReadSingleAsync(begin, end, catalogItem, timeoutTokenSource.Token); + await _communicator.ReadRawAsync(data, timeoutTokenSource.Token); + await _communicator.ReadRawAsync(status, timeoutTokenSource.Token); - await _communicator.ReadRawAsync(data, timeoutTokenSource.Token); - await _communicator.ReadRawAsync(status, timeoutTokenSource.Token); - - progress.Report(++counter / requests.Length); - } - } - finally - { - _readData = null; + progress.Report(++counter / requests.Length); } } - - private string BuildCommand(string templateId) + finally { - var template = Context.SystemConfiguration? - .GetStringValue($"{typeof(Remote).FullName}/templates/{templateId}"); - - if (template is null) - throw new Exception($"The template {templateId} does not exist."); - - var command = CommandRegex().Replace(template, match => - { - var parameterKey = match.Groups[1].Value; - var parameterValue = Context.SourceConfiguration?.GetStringValue(parameterKey); - - if (parameterValue is null) - throw new Exception($"The {parameterKey} parameter must be provided."); - - return parameterValue; - }); - - return command; + _readData = null; } + } - // copy from Nexus -> DataModelUtilities - private static readonly Regex _resourcePathEvaluator = new(@"^(?'catalog'.*)\/(?'resource'.*)\/(?'sample_period'[0-9]+_[a-zA-Z]+)(?:_(?'kind'[^\(#\s]+))?(?:\((?'parameters'.*)\))?(?:#(?'fragment'.*))?$", RegexOptions.Compiled); + private string BuildCommand(string templateId) + { + var template = (Context.SystemConfiguration? + .GetStringValue($"{typeof(Remote).FullName}/templates/{templateId}")) ?? throw new Exception($"The template {templateId} does not exist."); + var command = CommandRegex().Replace(template, match => + { + var parameterKey = match.Groups[1].Value; + var parameterValue = (Context.SourceConfiguration?.GetStringValue(parameterKey)) ?? throw new Exception($"The {parameterKey} parameter must be provided."); + return parameterValue; + }); - private static MethodInfo _toSamplePeriodMethodInfo = typeof(DataModelExtensions) - .GetMethod("ToSamplePeriod", BindingFlags.Static | BindingFlags.NonPublic) ?? throw new Exception("Unable to locate ToSamplePeriod method."); + return command; + } - private async Task HandleReadDataAsync(string resourcePath, DateTime begin, DateTime end) - { - // copy of _readData handler - var localReadData = _readData; + // copy from Nexus -> DataModelUtilities + private static readonly Regex _resourcePathEvaluator = MyRegex(); - if (localReadData is null) - throw new InvalidOperationException("Unable to read data without previous invocation of the ReadAsync method."); + private static readonly MethodInfo _toSamplePeriodMethodInfo = typeof(DataModelExtensions) + .GetMethod("ToSamplePeriod", BindingFlags.Static | BindingFlags.NonPublic) ?? throw new Exception("Unable to locate ToSamplePeriod method."); - // timeout token source - var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); + private async Task HandleReadDataAsync(string resourcePath, DateTime begin, DateTime end) + { + // copy of _readData handler + var localReadData = _readData ?? throw new InvalidOperationException("Unable to read data without previous invocation of the ReadAsync method."); - // find sample period - var match = _resourcePathEvaluator.Match(resourcePath); + // timeout token source + var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1)); - if (!match.Success) - throw new Exception("Invalid resource path"); + // find sample period + var match = _resourcePathEvaluator.Match(resourcePath); - var samplePeriod = (TimeSpan)_toSamplePeriodMethodInfo.Invoke(null, new object[] { - match.Groups["sample_period"].Value - })!; + if (!match.Success) + throw new Exception("Invalid resource path"); - // find buffer length and rent buffer - var length = (int)((end - begin).Ticks / samplePeriod.Ticks); + var samplePeriod = (TimeSpan)_toSamplePeriodMethodInfo.Invoke(null, new object[] { + match.Groups["sample_period"].Value + })!; - using var memoryOwner = MemoryPool.Shared.Rent(length); - var buffer = memoryOwner.Memory.Slice(0, length); + // find buffer length and rent buffer + var length = (int)((end - begin).Ticks / samplePeriod.Ticks); - // read data - await localReadData(resourcePath, begin, end, buffer, timeoutTokenSource.Token); - var byteBuffer = new CastMemoryManager(buffer).Memory; + using var memoryOwner = MemoryPool.Shared.Rent(length); + var buffer = memoryOwner.Memory[..length]; - // write to communicator - await _communicator.WriteRawAsync(byteBuffer, timeoutTokenSource.Token); - } + // read data + await localReadData(resourcePath, begin, end, buffer, timeoutTokenSource.Token); + var byteBuffer = new CastMemoryManager(buffer).Memory; - private static CancellationTokenSource GetTimeoutTokenSource(TimeSpan timeout) - { - var timeoutToken = new CancellationTokenSource(); - timeoutToken.CancelAfter(timeout); + // write to communicator + await _communicator.WriteRawAsync(byteBuffer, timeoutTokenSource.Token); + } - return timeoutToken; - } + private static CancellationTokenSource GetTimeoutTokenSource(TimeSpan timeout) + { + var timeoutToken = new CancellationTokenSource(); + timeoutToken.CancelAfter(timeout); - #endregion + return timeoutToken; + } - #region IDisposable + #endregion - private bool _disposedValue; + #region IDisposable - protected virtual void Dispose(bool disposing) + private bool _disposedValue; + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) { - if (!_disposedValue) + if (disposing) { - if (disposing) - { - _communicator?.Dispose(); - } - - _disposedValue = true; + _communicator?.Dispose(); } - } - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); + _disposedValue = true; } + } - [GeneratedRegex("{(.*?)}")] - private static partial Regex CommandRegex(); - - #endregion + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); } + + [GeneratedRegex("{(.*?)}")] + private static partial Regex CommandRegex(); + [GeneratedRegex(@"^(?'catalog'.*)\/(?'resource'.*)\/(?'sample_period'[0-9]+_[a-zA-Z]+)(?:_(?'kind'[^\(#\s]+))?(?:\((?'parameters'.*)\))?(?:#(?'fragment'.*))?$", RegexOptions.Compiled)] + private static partial Regex MyRegex(); + + #endregion } diff --git a/src/Nexus.Sources.Remote/RemoteCommunicator.cs b/src/Nexus.Sources.Remote/RemoteCommunicator.cs index a0c35fc..af146e5 100644 --- a/src/Nexus.Sources.Remote/RemoteCommunicator.cs +++ b/src/Nexus.Sources.Remote/RemoteCommunicator.cs @@ -9,312 +9,312 @@ using Newtonsoft.Json.Serialization; using StreamJsonRpc; -namespace Nexus.Sources +namespace Nexus.Sources; + +internal partial class RemoteCommunicator { - internal partial class RemoteCommunicator - { - #region Fields + #region Fields - private static readonly object _lock = new(); - private static int _nextMin = -1; - private readonly TcpListener _tcpListener; - private Stream _commStream = default!; - private Stream _dataStream = default!; - private IJsonRpcServer _rpcServer = default!; + private static readonly object _lock = new(); + private static int _nextMin = -1; + private readonly TcpListener _tcpListener; + private Stream _commStream = default!; + private Stream _dataStream = default!; + private IJsonRpcServer _rpcServer = default!; - private readonly ILogger _logger; - private readonly Func _readData; + private readonly ILogger _logger; + private readonly Func _readData; - private readonly string _command; - private readonly string _arguments; - private readonly Dictionary _environmentVariables; + private readonly string _command; + private readonly string _arguments; + private readonly Dictionary _environmentVariables; - private Process _process = default!; + private Process _process = default!; - #endregion + #endregion - #region Constructors + #region Constructors - public RemoteCommunicator( - string command, - Dictionary environmentVariables, - IPAddress listenAddress, - int listenPortMin, - int listenPortMax, - Func readData, - ILogger logger) - { - _environmentVariables = environmentVariables; - _readData = readData; - _logger = logger; + public RemoteCommunicator( + string command, + Dictionary environmentVariables, + IPAddress listenAddress, + int listenPortMin, + int listenPortMax, + Func readData, + ILogger logger) + { + _environmentVariables = environmentVariables; + _readData = readData; + _logger = logger; - var listenPort = GetNextUnusedPort(listenPortMin, listenPortMax); + var listenPort = GetNextUnusedPort(listenPortMin, listenPortMax); - command = CommandRegex().Replace(command, listenPort.ToString()); - var commandParts = command.Split(" ", count: 2); - _command = commandParts[0]; + command = CommandRegex().Replace(command, listenPort.ToString()); + var commandParts = command.Split(" ", count: 2); + _command = commandParts[0]; - _arguments = commandParts.Length == 2 - ? commandParts[1] - : ""; + _arguments = commandParts.Length == 2 + ? commandParts[1] + : ""; - _tcpListener = new TcpListener(listenAddress, listenPort); - _tcpListener.Start(); - } + _tcpListener = new TcpListener(listenAddress, listenPort); + _tcpListener.Start(); + } - #endregion + #endregion - #region Methods + #region Methods - public async Task ConnectAsync(CancellationToken cancellationToken) + public async Task ConnectAsync(CancellationToken cancellationToken) + { + try { - try + cancellationToken.Register(_tcpListener.Stop); + + // start process + _logger.LogDebug("Start process."); + + var psi = new ProcessStartInfo(_command) { - cancellationToken.Register(_tcpListener.Stop); - - // start process - _logger.LogDebug("Start process."); - - var psi = new ProcessStartInfo(_command) - { - Arguments = _arguments, - }; + Arguments = _arguments, + }; - foreach (var variable in _environmentVariables) - { - psi.EnvironmentVariables[variable.Key] = variable.Value; - } + foreach (var variable in _environmentVariables) + { + psi.EnvironmentVariables[variable.Key] = variable.Value; + } - psi.RedirectStandardOutput = true; - psi.RedirectStandardError = true; + psi.RedirectStandardOutput = true; + psi.RedirectStandardError = true; - _process = new Process() { StartInfo = psi }; - _process.Start(); + _process = new Process() { StartInfo = psi }; + _process.Start(); - _process.OutputDataReceived += (sender, e) => - { - if (!string.IsNullOrWhiteSpace(e.Data)) - _logger.LogDebug("{Message}", e.Data); - }; + _process.OutputDataReceived += (sender, e) => + { + if (!string.IsNullOrWhiteSpace(e.Data)) + _logger.LogDebug("{Message}", e.Data); + }; - _process.ErrorDataReceived += (sender, e) => - { - if (!string.IsNullOrWhiteSpace(e.Data)) - _logger.LogWarning("{Message}", e.Data); - }; + _process.ErrorDataReceived += (sender, e) => + { + if (!string.IsNullOrWhiteSpace(e.Data)) + _logger.LogWarning("{Message}", e.Data); + }; - _process.BeginOutputReadLine(); - _process.BeginErrorReadLine(); + _process.BeginOutputReadLine(); + _process.BeginErrorReadLine(); - // wait for clients to connect - _logger.LogDebug("Wait for clients to connect."); + // wait for clients to connect + _logger.LogDebug("Wait for clients to connect."); - var filters = new string[] { "comm", "data" }; + var filters = new string[] { "comm", "data" }; - Stream? commStream = default; - Stream? dataStream = default; + Stream? commStream = default; + Stream? dataStream = default; - for (int i = 0; i < 2; i++) - { - var (identifier, client) = await GetTcpClientAsync(filters, cancellationToken); + for (int i = 0; i < 2; i++) + { + var (identifier, client) = await GetTcpClientAsync(filters, cancellationToken); - if (commStream is null && identifier == "comm") - commStream = client.GetStream(); + if (commStream is null && identifier == "comm") + commStream = client.GetStream(); - else if (dataStream is null && identifier == "data") - dataStream = client.GetStream(); - } + else if (dataStream is null && identifier == "data") + dataStream = client.GetStream(); + } - if (commStream is null || dataStream is null) - throw new Exception("The RPC server did not connect properly via communication and a data stream. This may indicate that other TCP clients have tried to connect."); + if (commStream is null || dataStream is null) + throw new Exception("The RPC server did not connect properly via communication and a data stream. This may indicate that other TCP clients have tried to connect."); - _commStream = commStream; - _dataStream = dataStream; + _commStream = commStream; + _dataStream = dataStream; - var formatter = new JsonMessageFormatter() - { - JsonSerializer = { - ContractResolver = new DefaultContractResolver - { - NamingStrategy = new CamelCaseNamingStrategy() - } + var formatter = new JsonMessageFormatter() + { + JsonSerializer = { + ContractResolver = new DefaultContractResolver + { + NamingStrategy = new CamelCaseNamingStrategy() } - }; - - formatter.JsonSerializer.Converters.Add(new JsonElementConverter()); - formatter.JsonSerializer.Converters.Add(new StringEnumConverter()); + } + }; - var messageHandler = new LengthHeaderMessageHandler(commStream, commStream, formatter); - var jsonRpc = new JsonRpc(messageHandler); + formatter.JsonSerializer.Converters.Add(new JsonElementConverter()); + formatter.JsonSerializer.Converters.Add(new StringEnumConverter()); - jsonRpc.AddLocalRpcMethod("log", new Action((logLevel, message) => - { - _logger.Log(logLevel, "{Message}", message); - })); + var messageHandler = new LengthHeaderMessageHandler(commStream, commStream, formatter); + var jsonRpc = new JsonRpc(messageHandler); - jsonRpc.AddLocalRpcMethod("readData", _readData); - jsonRpc.StartListening(); + jsonRpc.AddLocalRpcMethod("log", new Action((logLevel, message) => + { + _logger.Log(logLevel, "{Message}", message); + })); - _rpcServer = jsonRpc.Attach(new JsonRpcProxyOptions() - { - MethodNameTransform = pascalCaseAsyncName => - { - return char.ToLower(pascalCaseAsyncName[0]) + pascalCaseAsyncName[1..].Replace("Async", string.Empty); - } - }); + jsonRpc.AddLocalRpcMethod("readData", _readData); + jsonRpc.StartListening(); - return _rpcServer; - } - catch + _rpcServer = jsonRpc.Attach(new JsonRpcProxyOptions() { - try + MethodNameTransform = pascalCaseAsyncName => { - _process?.Kill(); - } - catch { - // + return char.ToLower(pascalCaseAsyncName[0]) + pascalCaseAsyncName[1..].Replace("Async", string.Empty); } - - throw; + }); + + return _rpcServer; + } + catch + { + try + { + _process?.Kill(); } - finally + catch { - _tcpListener.Stop(); + // } - } - public Task ReadRawAsync(Memory buffer, CancellationToken cancellationToken) + throw; + } + finally { - return InternalReadRawAsync(buffer, _dataStream, cancellationToken); + _tcpListener.Stop(); } + } + + public Task ReadRawAsync(Memory buffer, CancellationToken cancellationToken) + { + return InternalReadRawAsync(buffer, _dataStream, cancellationToken); + } - private static async Task InternalReadRawAsync(Memory buffer, Stream source, CancellationToken cancellationToken) + private static async Task InternalReadRawAsync(Memory buffer, Stream source, CancellationToken cancellationToken) + { + while (buffer.Length > 0) { - while (buffer.Length > 0) - { - cancellationToken.ThrowIfCancellationRequested(); - var readCount = await source.ReadAsync(buffer, cancellationToken); + cancellationToken.ThrowIfCancellationRequested(); + var readCount = await source.ReadAsync(buffer, cancellationToken); - if (readCount == 0) - throw new Exception("The TCP connection closed early."); + if (readCount == 0) + throw new Exception("The TCP connection closed early."); - buffer = buffer[readCount..]; - } + buffer = buffer[readCount..]; } + } - public Task WriteRawAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) - { - return InternalWriteRawAsync(buffer, _dataStream, cancellationToken); - } + public Task WriteRawAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken) + { + return InternalWriteRawAsync(buffer, _dataStream, cancellationToken); + } - private static async Task InternalWriteRawAsync(ReadOnlyMemory buffer, Stream target, CancellationToken cancellationToken) - { - var length = BitConverter.GetBytes(buffer.Length).Reverse().ToArray(); + private static async Task InternalWriteRawAsync(ReadOnlyMemory buffer, Stream target, CancellationToken cancellationToken) + { + var length = BitConverter.GetBytes(buffer.Length).Reverse().ToArray(); - await target.WriteAsync(length, cancellationToken); - await target.WriteAsync(buffer, cancellationToken); - await target.FlushAsync(cancellationToken); - } + await target.WriteAsync(length, cancellationToken); + await target.WriteAsync(buffer, cancellationToken); + await target.FlushAsync(cancellationToken); + } - private static int GetNextUnusedPort(int min, int max) + private static int GetNextUnusedPort(int min, int max) + { + lock (_lock) { - lock (_lock) - { - min = Math.Max(_nextMin, min); + min = Math.Max(_nextMin, min); - if (max <= min) - throw new ArgumentException("Max port cannot be less than or equal to min."); + if (max <= min) + throw new ArgumentException("Max port cannot be less than or equal to min."); - var ipProperties = IPGlobalProperties.GetIPGlobalProperties(); + var ipProperties = IPGlobalProperties.GetIPGlobalProperties(); - var usedPorts = - ipProperties.GetActiveTcpConnections() - .Where(connection => connection.State != TcpState.Closed) - .Select(connection => connection.LocalEndPoint) - .Concat(ipProperties.GetActiveTcpListeners()) - .Select(endpoint => endpoint.Port) - .ToArray(); + var usedPorts = + ipProperties.GetActiveTcpConnections() + .Where(connection => connection.State != TcpState.Closed) + .Select(connection => connection.LocalEndPoint) + .Concat(ipProperties.GetActiveTcpListeners()) + .Select(endpoint => endpoint.Port) + .ToArray(); - var firstUnused = - Enumerable.Range(min, max - min) - .Where(port => !usedPorts.Contains(port)) - .Select(port => new int?(port)) - .FirstOrDefault(); + var firstUnused = + Enumerable.Range(min, max - min) + .Where(port => !usedPorts.Contains(port)) + .Select(port => new int?(port)) + .FirstOrDefault(); - if (!firstUnused.HasValue) - throw new Exception($"All TCP ports in the range of {min}..{max} are currently in use."); + if (!firstUnused.HasValue) + throw new Exception($"All TCP ports in the range of {min}..{max} are currently in use."); - _nextMin = (firstUnused.Value + 1) % max; - return firstUnused.Value; - } + _nextMin = (firstUnused.Value + 1) % max; + return firstUnused.Value; } + } - private async Task<(string Identifier, TcpClient Client)> GetTcpClientAsync(string[] filters, CancellationToken cancellationToken) - { - var buffer = new byte[4]; - var client = await _tcpListener.AcceptTcpClientAsync(cancellationToken); - - await InternalReadRawAsync(buffer, client.GetStream(), cancellationToken); + private async Task<(string Identifier, TcpClient Client)> GetTcpClientAsync(string[] filters, CancellationToken cancellationToken) + { + var buffer = new byte[4]; + var client = await _tcpListener.AcceptTcpClientAsync(cancellationToken); - foreach (var filter in filters) - { - if (buffer.SequenceEqual(Encoding.UTF8.GetBytes(filter))) - return (filter, client); - } + await InternalReadRawAsync(buffer, client.GetStream(), cancellationToken); - throw new Exception("Invalid stream identifier received."); + foreach (var filter in filters) + { + if (buffer.SequenceEqual(Encoding.UTF8.GetBytes(filter))) + return (filter, client); } - #endregion + throw new Exception("Invalid stream identifier received."); + } - #region IDisposable + #endregion - private bool _disposedValue; + #region IDisposable - protected virtual void Dispose(bool disposing) + private bool _disposedValue; + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) { - if (!_disposedValue) + if (disposing) { - if (disposing) + try { + var disposable = _rpcServer as IDisposable; + disposable?.Dispose(); + + _commStream?.Dispose(); + _dataStream?.Dispose(); + try { - var disposable = _rpcServer as IDisposable; - disposable?.Dispose(); - - _commStream?.Dispose(); - _dataStream?.Dispose(); - - try - { - _process?.Kill(); - } - catch - { - // - } + _process?.Kill(); } - catch (Exception) + catch { // } - - //_process?.WaitForExitAsync(); + } + catch (Exception) + { + // } - _disposedValue = true; + //_process?.WaitForExitAsync(); } - } - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); + _disposedValue = true; } + } - [GeneratedRegex("{remote-port}")] - private static partial Regex CommandRegex(); - - #endregion + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); } + + [GeneratedRegex("{remote-port}")] + private static partial Regex CommandRegex(); + + #endregion } diff --git a/src/remoting/dotnet-remoting/Remoting.cs b/src/remoting/dotnet-remoting/Remoting.cs index ad9f901..0288ee4 100644 --- a/src/remoting/dotnet-remoting/Remoting.cs +++ b/src/remoting/dotnet-remoting/Remoting.cs @@ -12,14 +12,9 @@ namespace Nexus.Remoting; -internal class Logger : ILogger +internal class Logger(NetworkStream tcpCommSocketStream) : ILogger { - private readonly NetworkStream _tcpCommSocketStream; - - public Logger(NetworkStream tcpCommSocketStream) - { - _tcpCommSocketStream = tcpCommSocketStream; - } + private readonly NetworkStream _tcpCommSocketStream = tcpCommSocketStream; public IDisposable BeginScope(TState state) { @@ -111,7 +106,7 @@ static JsonElement Read(Span jsonRequest) var size = ReadSize(_tcpCommSocketStream); using var memoryOwner = MemoryPool.Shared.Rent(size); - var messageMemory = memoryOwner.Memory.Slice(0, size); + var messageMemory = memoryOwner.Memory[..size]; _tcpCommSocketStream.ReadExactly(messageMemory.Span, _logger); var request = Read(messageMemory.Span); @@ -307,7 +302,7 @@ static JsonElement Read(Span jsonRequest) await _dataSource.ReadAsync( begin, end, - new ReadRequest[] { readRequest }, + [readRequest], HandleReadDataAsync, new Progress(), CancellationToken.None @@ -362,7 +357,7 @@ private async Task HandleReadDataAsync( throw new Exception("Data returned by Nexus have an unexpected length"); _logger.LogTrace("Try to read {ByteCount} bytes from Nexus", size); - + _tcpDataSocketStream.ReadExactly(MemoryMarshal.AsBytes(buffer.Span), _logger); } @@ -428,18 +423,16 @@ public static void ReadExactly(this Stream stream, Span buffer, ILogger lo Environment.Exit(0); } - buffer = buffer.Slice(read); + buffer = buffer[read..]; } } } -internal class CastMemoryManager : MemoryManager +internal class CastMemoryManager(Memory from) : MemoryManager where TFrom : struct where TTo : struct { - private readonly Memory _from; - - public CastMemoryManager(Memory from) => _from = from; + private readonly Memory _from = from; public override Span GetSpan() => MemoryMarshal.Cast(_from.Span); diff --git a/src/remoting/dotnet-remoting/dotnet-remoting.csproj b/src/remoting/dotnet-remoting/dotnet-remoting.csproj index 197bc6e..702a3cb 100644 --- a/src/remoting/dotnet-remoting/dotnet-remoting.csproj +++ b/src/remoting/dotnet-remoting/dotnet-remoting.csproj @@ -26,7 +26,7 @@ - + diff --git a/tests/Nexus.Sources.Remote.Tests/Nexus.Sources.Remote.Tests.csproj b/tests/Nexus.Sources.Remote.Tests/Nexus.Sources.Remote.Tests.csproj index 6db8da7..390ee5d 100644 --- a/tests/Nexus.Sources.Remote.Tests/Nexus.Sources.Remote.Tests.csproj +++ b/tests/Nexus.Sources.Remote.Tests/Nexus.Sources.Remote.Tests.csproj @@ -13,20 +13,11 @@ - - Always - - - Always - - - - - - - - - + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive all diff --git a/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs b/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs index a2d47c9..79786d3 100644 --- a/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs +++ b/tests/Nexus.Sources.Remote.Tests/RemoteTests.cs @@ -8,263 +8,262 @@ using System.Text.Json.Nodes; using Xunit; -namespace Nexus.Sources.Tests +namespace Nexus.Sources.Tests; + +[Trait("TestCategory", "local")] +public class RemoteTests { - [Trait("TestCategory", "local")] - public class RemoteTests - { - [Theory] - [InlineData("dotnet run --project dotnet/remote.csproj localhost {remote-port}")] - [InlineData("python python/remote.py localhost {remote-port}")] + [Theory] + [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}")] + [InlineData("python python/remote.py localhost {remote-port}")] #if LINUX - [InlineData("bash bash/remote.sh localhost {remote-port}")] + [InlineData("bash bash/remote.sh localhost {remote-port}")] #endif - public async Task ProvidesCatalog(string command) - { - // arrange - var dataSource = new Remote() as IDataSource; - var context = CreateContext(command); - - await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); - - // act - var actual = await dataSource.GetCatalogAsync("/A/B/C", CancellationToken.None); - - // assert - var actualProperties1 = actual.Properties; - var actualIds = actual.Resources!.Select(resource => resource.Id).ToList(); - var actualUnits = actual.Resources!.Select(resource => resource.Properties?.GetStringValue("unit")).ToList(); - var actualGroups = actual.Resources!.SelectMany(resource => resource.Properties?.GetStringArray("groups")!); - var actualDataTypes = actual.Resources!.SelectMany(resource => resource.Representations!.Select(representation => representation.DataType)).ToList(); - - var expectedProperties1 = new Dictionary() { ["a"] = "b", ["c"] = 1 }; - var expectedIds = new List() { "resource1", "resource2" }; - var expectedUnits = new List() { "°C", "bar" }; - var expectedDataTypes = new List() { NexusDataType.INT64, NexusDataType.FLOAT64 }; - var expectedGroups = new List() { "group1", "group2" }; - - Assert.True(JsonSerializer.Serialize(actualProperties1) == JsonSerializer.Serialize(expectedProperties1)); - Assert.True(expectedIds.SequenceEqual(actualIds)); - Assert.True(expectedUnits.SequenceEqual(actualUnits)); - Assert.True(expectedGroups.SequenceEqual(actualGroups)); - Assert.True(expectedDataTypes.SequenceEqual(actualDataTypes)); - } + public async Task ProvidesCatalog(string command) + { + // arrange + var dataSource = new Remote() as IDataSource; + var context = CreateContext(command); + + await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); + + // act + var actual = await dataSource.GetCatalogAsync("/A/B/C", CancellationToken.None); + + // assert + var actualProperties1 = actual.Properties; + var actualIds = actual.Resources!.Select(resource => resource.Id).ToList(); + var actualUnits = actual.Resources!.Select(resource => resource.Properties?.GetStringValue("unit")).ToList(); + var actualGroups = actual.Resources!.SelectMany(resource => resource.Properties?.GetStringArray("groups")!); + var actualDataTypes = actual.Resources!.SelectMany(resource => resource.Representations!.Select(representation => representation.DataType)).ToList(); + + var expectedProperties1 = new Dictionary() { ["a"] = "b", ["c"] = 1 }; + var expectedIds = new List() { "resource1", "resource2" }; + var expectedUnits = new List() { "°C", "bar" }; + var expectedDataTypes = new List() { NexusDataType.INT64, NexusDataType.FLOAT64 }; + var expectedGroups = new List() { "group1", "group2" }; + + Assert.True(JsonSerializer.Serialize(actualProperties1) == JsonSerializer.Serialize(expectedProperties1)); + Assert.True(expectedIds.SequenceEqual(actualIds)); + Assert.True(expectedUnits.SequenceEqual(actualUnits)); + Assert.True(expectedGroups.SequenceEqual(actualGroups)); + Assert.True(expectedDataTypes.SequenceEqual(actualDataTypes)); + } - [Theory] - [InlineData("dotnet run --project dotnet/remote.csproj localhost {remote-port}")] - [InlineData("python python/remote.py localhost {remote-port}")] + [Theory] + [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}")] + [InlineData("python python/remote.py localhost {remote-port}")] #if LINUX - [InlineData("bash bash/remote.sh localhost {remote-port}")] + [InlineData("bash bash/remote.sh localhost {remote-port}")] #endif - public async Task CanProvideTimeRange(string command) - { - var dataSource = new Remote() as IDataSource; - var context = CreateContext(command); + public async Task CanProvideTimeRange(string command) + { + var dataSource = new Remote() as IDataSource; + var context = CreateContext(command); - var expectedBegin = new DateTime(2019, 12, 31, 12, 00, 00, DateTimeKind.Utc); - var expectedEnd = new DateTime(2020, 01, 02, 09, 50, 00, DateTimeKind.Utc); + var expectedBegin = new DateTime(2019, 12, 31, 12, 00, 00, DateTimeKind.Utc); + var expectedEnd = new DateTime(2020, 01, 02, 09, 50, 00, DateTimeKind.Utc); - await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); + await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); - var (begin, end) = await dataSource.GetTimeRangeAsync("/A/B/C", CancellationToken.None); + var (begin, end) = await dataSource.GetTimeRangeAsync("/A/B/C", CancellationToken.None); - Assert.Equal(expectedBegin, begin); - Assert.Equal(expectedEnd, end); - } + Assert.Equal(expectedBegin, begin); + Assert.Equal(expectedEnd, end); + } - [Theory] - [InlineData("dotnet run --project dotnet/remote.csproj localhost {remote-port}")] - [InlineData("python python/remote.py localhost {remote-port}")] + [Theory] + [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}")] + [InlineData("python python/remote.py localhost {remote-port}")] #if LINUX - [InlineData("bash bash/remote.sh localhost {remote-port}")] + [InlineData("bash bash/remote.sh localhost {remote-port}")] #endif - public async Task CanProvideAvailability(string command) - { - var dataSource = new Remote() as IDataSource; - var context = CreateContext(command); + public async Task CanProvideAvailability(string command) + { + var dataSource = new Remote() as IDataSource; + var context = CreateContext(command); - await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); + await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); - var begin = new DateTime(2020, 01, 02, 00, 00, 00, DateTimeKind.Utc); - var end = new DateTime(2020, 01, 03, 00, 00, 00, DateTimeKind.Utc); - var actual = await dataSource.GetAvailabilityAsync("/A/B/C", begin, end, CancellationToken.None); + var begin = new DateTime(2020, 01, 02, 00, 00, 00, DateTimeKind.Utc); + var end = new DateTime(2020, 01, 03, 00, 00, 00, DateTimeKind.Utc); + var actual = await dataSource.GetAvailabilityAsync("/A/B/C", begin, end, CancellationToken.None); - Assert.Equal(2 / 144.0, actual, precision: 4); - } + Assert.Equal(2 / 144.0, actual, precision: 4); + } - [Theory] - [InlineData("dotnet run --project dotnet/remote.csproj localhost {remote-port}", true)] - [InlineData("python python/remote.py localhost {remote-port}", true)] + [Theory] + [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}", true)] + [InlineData("python python/remote.py localhost {remote-port}", true)] #if LINUX - [InlineData("bash bash/remote.sh localhost {remote-port}", false)] + [InlineData("bash bash/remote.sh localhost {remote-port}", false)] #endif - public async Task CanReadFullDay(string command, bool complexData) - { - var dataSource = new Remote() as IDataSource; - var context = CreateContext(command); + public async Task CanReadFullDay(string command, bool complexData) + { + var dataSource = new Remote() as IDataSource; + var context = CreateContext(command); - await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); + await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); - var catalog = await dataSource.GetCatalogAsync("/A/B/C", CancellationToken.None); - var resource = catalog.Resources![0]; - var representation = resource.Representations![0]; + var catalog = await dataSource.GetCatalogAsync("/A/B/C", CancellationToken.None); + var resource = catalog.Resources![0]; + var representation = resource.Representations![0]; - var catalogItem = new CatalogItem( - catalog with { Resources = default! }, - resource with { Representations = default! }, - representation, - default); + var catalogItem = new CatalogItem( + catalog with { Resources = default! }, + resource with { Representations = default! }, + representation, + default); - var begin = new DateTime(2019, 12, 31, 0, 0, 0, DateTimeKind.Utc); - var end = new DateTime(2020, 01, 03, 0, 0, 0, DateTimeKind.Utc); - var (data, status) = ExtensibilityUtilities.CreateBuffers(representation, begin, end); + var begin = new DateTime(2019, 12, 31, 0, 0, 0, DateTimeKind.Utc); + var end = new DateTime(2020, 01, 03, 0, 0, 0, DateTimeKind.Utc); + var (data, status) = ExtensibilityUtilities.CreateBuffers(representation, begin, end); - var length = 3 * 86400; - var expectedData = new long[length]; - var expectedStatus = new byte[length]; + var length = 3 * 86400; + var expectedData = new long[length]; + var expectedStatus = new byte[length]; - if (complexData) - { - void GenerateData(DateTimeOffset dateTime) - { - var data = Enumerable.Range(0, 600) - .Select(value => dateTime.Add(TimeSpan.FromSeconds(value)).ToUnixTimeSeconds()) - .ToArray(); - - var offset = (int)(dateTime - begin).TotalSeconds; - data.CopyTo(expectedData.AsSpan()[offset..]); - expectedStatus.AsSpan().Slice(offset, 600).Fill(1); - } - - GenerateData(new DateTimeOffset(2019, 12, 31, 12, 00, 0, 0, TimeSpan.Zero)); - GenerateData(new DateTimeOffset(2019, 12, 31, 12, 20, 0, 0, TimeSpan.Zero)); - GenerateData(new DateTimeOffset(2020, 01, 01, 00, 00, 0, 0, TimeSpan.Zero)); - GenerateData(new DateTimeOffset(2020, 01, 02, 09, 40, 0, 0, TimeSpan.Zero)); - GenerateData(new DateTimeOffset(2020, 01, 02, 09, 50, 0, 0, TimeSpan.Zero)); - } - else + if (complexData) + { + void GenerateData(DateTimeOffset dateTime) { - MemoryMarshal.AsBytes(expectedData.AsSpan()).Fill((byte)'d'); - expectedStatus.AsSpan().Fill((byte)'s'); - } + var data = Enumerable.Range(0, 600) + .Select(value => dateTime.Add(TimeSpan.FromSeconds(value)).ToUnixTimeSeconds()) + .ToArray(); - var request = new ReadRequest(catalogItem, data, status); - await dataSource.ReadAsync(begin, end, new ReadRequest[] { request }, default!, new Progress(), CancellationToken.None); - var longData = new CastMemoryManager(data).Memory; + var offset = (int)(dateTime - begin).TotalSeconds; + data.CopyTo(expectedData.AsSpan()[offset..]); + expectedStatus.AsSpan().Slice(offset, 600).Fill(1); + } - Assert.True(expectedData.SequenceEqual(longData.ToArray())); - Assert.True(expectedStatus.SequenceEqual(status.ToArray())); + GenerateData(new DateTimeOffset(2019, 12, 31, 12, 00, 0, 0, TimeSpan.Zero)); + GenerateData(new DateTimeOffset(2019, 12, 31, 12, 20, 0, 0, TimeSpan.Zero)); + GenerateData(new DateTimeOffset(2020, 01, 01, 00, 00, 0, 0, TimeSpan.Zero)); + GenerateData(new DateTimeOffset(2020, 01, 02, 09, 40, 0, 0, TimeSpan.Zero)); + GenerateData(new DateTimeOffset(2020, 01, 02, 09, 50, 0, 0, TimeSpan.Zero)); } + else + { + MemoryMarshal.AsBytes(expectedData.AsSpan()).Fill((byte)'d'); + expectedStatus.AsSpan().Fill((byte)'s'); + } + + var request = new ReadRequest(catalogItem, data, status); + await dataSource.ReadAsync(begin, end, [request], default!, new Progress(), CancellationToken.None); + var longData = new CastMemoryManager(data).Memory; - [Theory] - [InlineData("dotnet run --project dotnet/remote.csproj localhost {remote-port}")] - [InlineData("python python/remote.py localhost {remote-port}")] + Assert.True(expectedData.SequenceEqual(longData.ToArray())); + Assert.True(expectedStatus.SequenceEqual(status.ToArray())); + } + + [Theory] + [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}")] + [InlineData("python python/remote.py localhost {remote-port}")] #if LINUX - [InlineData("bash bash/remote.sh localhost {remote-port}")] + [InlineData("bash bash/remote.sh localhost {remote-port}")] #endif - public async Task CanLog(string command) - { - var loggerMock = new Mock(); - var dataSource = new Remote() as IDataSource; - var context = CreateContext(command); - - await dataSource.SetContextAsync(context, loggerMock.Object, CancellationToken.None); - - loggerMock.Verify( - x => x.Log( - It.Is(logLevel => logLevel == LogLevel.Information), - It.IsAny(), - It.Is((message, _) => message.ToString() == "Logging works!"), - It.IsAny(), - It.Is>((v, t) => true) - ), - Times.Once - ); - } + public async Task CanLog(string command) + { + var loggerMock = new Mock(); + var dataSource = new Remote() as IDataSource; + var context = CreateContext(command); + + await dataSource.SetContextAsync(context, loggerMock.Object, CancellationToken.None); + + loggerMock.Verify( + x => x.Log( + It.Is(logLevel => logLevel == LogLevel.Information), + It.IsAny(), + It.Is((message, _) => message.ToString() == "Logging works!"), + It.IsAny(), + It.Is>((v, t) => true) + ), + Times.Once + ); + } - [Theory] - [InlineData("dotnet run --project dotnet/remote.csproj localhost {remote-port}")] - [InlineData("python python/remote.py localhost {remote-port}")] - public async Task CanReadDataHandler(string command) - { - var dataSource = new Remote() as IDataSource; - var context = CreateContext(command); + [Theory] + [InlineData("dotnet run --project ../../../../tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj localhost {remote-port}")] + [InlineData("python python/remote.py localhost {remote-port}")] + public async Task CanReadDataHandler(string command) + { + var dataSource = new Remote() as IDataSource; + var context = CreateContext(command); - await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); + await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); - var catalog = await dataSource.GetCatalogAsync("/D/E/F", CancellationToken.None); - var resource = catalog.Resources![0]; - var representation = resource.Representations![0]; + var catalog = await dataSource.GetCatalogAsync("/D/E/F", CancellationToken.None); + var resource = catalog.Resources![0]; + var representation = resource.Representations![0]; - var catalogItem = new CatalogItem( - catalog with { Resources = default! }, - resource with { Representations = default! }, - representation, - default); + var catalogItem = new CatalogItem( + catalog with { Resources = default! }, + resource with { Representations = default! }, + representation, + default); - var begin = new DateTime(2020, 01, 01, 0, 0, 0, DateTimeKind.Utc); - var end = new DateTime(2020, 01, 01, 0, 1, 0, DateTimeKind.Utc); - var (data, status) = ExtensibilityUtilities.CreateBuffers(representation, begin, end); + var begin = new DateTime(2020, 01, 01, 0, 0, 0, DateTimeKind.Utc); + var end = new DateTime(2020, 01, 01, 0, 1, 0, DateTimeKind.Utc); + var (data, status) = ExtensibilityUtilities.CreateBuffers(representation, begin, end); - var length = 60; + var length = 60; - var expectedData = Enumerable - .Range(0, length) - .Select(value => (double)value * 2) - .ToArray(); + var expectedData = Enumerable + .Range(0, length) + .Select(value => (double)value * 2) + .ToArray(); - var expectedStatus = Enumerable + var expectedStatus = Enumerable + .Range(0, length) + .Select(value => (byte)1) + .ToArray(); + + Task HandleReadDataAsync(string resourcePath, DateTime begin, DateTime end, Memory buffer, CancellationToken cancellationToken) + { + var data = Enumerable .Range(0, length) - .Select(value => (byte)1) + .Select(value => (double)value) .ToArray(); - Task HandleReadDataAsync(string resourcePath, DateTime begin, DateTime end, Memory buffer, CancellationToken cancellationToken) - { - var data = Enumerable - .Range(0, length) - .Select(value => (double)value) - .ToArray(); - - data.CopyTo(buffer); + data.CopyTo(buffer); - return Task.CompletedTask; - } + return Task.CompletedTask; + } - var request = new ReadRequest(catalogItem, data, status); - await dataSource.ReadAsync(begin, end, new ReadRequest[] { request }, HandleReadDataAsync, new Progress(), CancellationToken.None); - var doubleData = new CastMemoryManager(data).Memory; + var request = new ReadRequest(catalogItem, data, status); + await dataSource.ReadAsync(begin, end, [request], HandleReadDataAsync, new Progress(), CancellationToken.None); + var doubleData = new CastMemoryManager(data).Memory; - Assert.True(expectedData.SequenceEqual(doubleData.ToArray())); - Assert.True(expectedStatus.SequenceEqual(status.ToArray())); - } + Assert.True(expectedData.SequenceEqual(doubleData.ToArray())); + Assert.True(expectedStatus.SequenceEqual(status.ToArray())); + } - private static DataSourceContext CreateContext(string command) - { - return new DataSourceContext( - ResourceLocator: new Uri("file:///" + Path.Combine(Directory.GetCurrentDirectory(), "TESTDATA")), - SystemConfiguration: new Dictionary() + private static DataSourceContext CreateContext(string command) + { + return new DataSourceContext( + ResourceLocator: new Uri("file:///" + Path.Combine(Directory.GetCurrentDirectory(), "TESTDATA")), + SystemConfiguration: new Dictionary() + { + [typeof(Remote).FullName!] = JsonSerializer.SerializeToElement(new JsonObject() { - [typeof(Remote).FullName!] = JsonSerializer.SerializeToElement(new JsonObject() + ["templates"] = new JsonObject() { - ["templates"] = new JsonObject() - { - ["local"] = "{command}", - } - }) - }, - SourceConfiguration: new Dictionary() + ["local"] = "{command}", + } + }) + }, + SourceConfiguration: new Dictionary() + { + ["listen-address"] = JsonSerializer.SerializeToElement("127.0.0.1"), + ["listen-port-min"] = JsonSerializer.SerializeToElement("63000"), + ["template"] = JsonSerializer.SerializeToElement("local"), + ["command"] = JsonSerializer.SerializeToElement(command), + ["environment-variables"] = JsonSerializer.SerializeToElement(new JsonObject() { - ["listen-address"] = JsonSerializer.SerializeToElement("127.0.0.1"), - ["listen-port-min"] = JsonSerializer.SerializeToElement("63000"), - ["template"] = JsonSerializer.SerializeToElement("local"), - ["command"] = JsonSerializer.SerializeToElement(command), - ["environment-variables"] = JsonSerializer.SerializeToElement(new JsonObject() - { - ["PYTHONPATH"] = $"{Path.Combine(Directory.GetCurrentDirectory(), "..", "..", "..", "..", "..", "src", "remoting", "python-remoting")}" - }) - }, - RequestConfiguration: default - ); - } + ["PYTHONPATH"] = $"{Path.Combine(Directory.GetCurrentDirectory(), "..", "..", "..", "..", "..", "src", "remoting", "python-remoting")}" + }) + }, + RequestConfiguration: default + ); } } \ No newline at end of file diff --git a/tests/Nexus.Sources.Remote.Tests/SetupDockerTests.cs b/tests/Nexus.Sources.Remote.Tests/SetupDockerTests.cs index 7975a87..7794580 100644 --- a/tests/Nexus.Sources.Remote.Tests/SetupDockerTests.cs +++ b/tests/Nexus.Sources.Remote.Tests/SetupDockerTests.cs @@ -5,100 +5,99 @@ using System.Text.Json.Nodes; using Xunit; -namespace Nexus.Sources.Tests +namespace Nexus.Sources.Tests; + +[Trait("TestCategory", "docker")] +public class SetupDockerTests { - [Trait("TestCategory", "docker")] - public class SetupDockerTests - { #if LINUX - [Theory] - [InlineData("python", "main.py nexus-main {remote-port}", "v2.0.0-beta.1")] - [InlineData("dotnet", "nexus-remoting-sample.csproj nexus-main {remote-port}", "v2.0.0-beta.3")] + [Theory] + [InlineData("python", "main.py nexus-main {remote-port}", "v2.0.0-beta.1")] + [InlineData("dotnet", "nexus-remoting-sample.csproj nexus-main {remote-port}", "v2.0.0-beta.3")] #endif - public async Task CanReadFullDay(string satelliteId, string command, string version) - { - var dataSource = new Remote() as IDataSource; - var context = CreateContext(satelliteId, command, version); + public async Task CanReadFullDay(string satelliteId, string command, string version) + { + var dataSource = new Remote() as IDataSource; + var context = CreateContext(satelliteId, command, version); - await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); + await dataSource.SetContextAsync(context, NullLogger.Instance, CancellationToken.None); - var catalog = await dataSource.GetCatalogAsync("/A/B/C", CancellationToken.None); - var resource = catalog.Resources![0]; - var representation = resource.Representations![0]; + var catalog = await dataSource.GetCatalogAsync("/A/B/C", CancellationToken.None); + var resource = catalog.Resources![0]; + var representation = resource.Representations![0]; - var catalogItem = new CatalogItem( - catalog with { Resources = default! }, - resource with { Representations = default! }, - representation, - default); + var catalogItem = new CatalogItem( + catalog with { Resources = default! }, + resource with { Representations = default! }, + representation, + default); - var begin = new DateTime(2020, 01, 01, 0, 0, 0, DateTimeKind.Utc); - var end = new DateTime(2020, 01, 01, 0, 0, 10, DateTimeKind.Utc); - var (data, status) = ExtensibilityUtilities.CreateBuffers(representation, begin, end); + var begin = new DateTime(2020, 01, 01, 0, 0, 0, DateTimeKind.Utc); + var end = new DateTime(2020, 01, 01, 0, 0, 10, DateTimeKind.Utc); + var (data, status) = ExtensibilityUtilities.CreateBuffers(representation, begin, end); - var length = 10; - var expectedData = new double[length]; - var expectedStatus = new byte[length]; + var length = 10; + var expectedData = new double[length]; + var expectedStatus = new byte[length]; - for (int i = 0; i < length; i++) - { - expectedData[i] = i * 2; - } + for (int i = 0; i < length; i++) + { + expectedData[i] = i * 2; + } - expectedStatus.AsSpan().Fill(1); + expectedStatus.AsSpan().Fill(1); - Task ReadData(string resourcePath, DateTime begin, DateTime end, Memory buffer, CancellationToken cancellationToken) + Task ReadData(string resourcePath, DateTime begin, DateTime end, Memory buffer, CancellationToken cancellationToken) + { + var spanBuffer = buffer.Span; + + for (int i = 0; i < length; i++) { - var spanBuffer = buffer.Span; + spanBuffer[i] = i; + } - for (int i = 0; i < length; i++) - { - spanBuffer[i] = i; - } + return Task.CompletedTask; + } - return Task.CompletedTask; - } + var request = new ReadRequest(catalogItem, data, status); - var request = new ReadRequest(catalogItem, data, status); - - await dataSource.ReadAsync( - begin, - end, - new ReadRequest[] { request }, - ReadData, - new Progress(), - CancellationToken.None); + await dataSource.ReadAsync( + begin, + end, + [request], + ReadData, + new Progress(), + CancellationToken.None); - var doubleData = new CastMemoryManager(data).Memory; + var doubleData = new CastMemoryManager(data).Memory; - Assert.True(expectedData.SequenceEqual(doubleData.ToArray())); - Assert.True(expectedStatus.SequenceEqual(status.ToArray())); - } + Assert.True(expectedData.SequenceEqual(doubleData.ToArray())); + Assert.True(expectedStatus.SequenceEqual(status.ToArray())); + } - private static DataSourceContext CreateContext(string satelliteId, string command, string version) - { - return new DataSourceContext( - ResourceLocator: default, - SystemConfiguration: new Dictionary() + private static DataSourceContext CreateContext(string satelliteId, string command, string version) + { + return new DataSourceContext( + ResourceLocator: default, + SystemConfiguration: new Dictionary() + { + [typeof(Remote).FullName!] = JsonSerializer.SerializeToElement(new JsonObject() { - [typeof(Remote).FullName!] = JsonSerializer.SerializeToElement(new JsonObject() + ["templates"] = new JsonObject() { - ["templates"] = new JsonObject() - { - ["docker"] = $"ssh root@nexus-{satelliteId} bash run.sh {{git-url}} {{git-tag}} {{command}}", - } - }) - }, - SourceConfiguration: new Dictionary() - { - ["listen-address"] = JsonSerializer.SerializeToElement("0.0.0.0"), - ["template"] = JsonSerializer.SerializeToElement("docker"), - ["command"] = JsonSerializer.SerializeToElement(command), - ["git-url"] = JsonSerializer.SerializeToElement($"https://github.com/malstroem-labs/nexus-remoting-template-{satelliteId}"), - ["git-tag"] = JsonSerializer.SerializeToElement(version) - }, - RequestConfiguration: default - ); - } + ["docker"] = $"ssh root@nexus-{satelliteId} bash run.sh {{git-url}} {{git-tag}} {{command}}", + } + }) + }, + SourceConfiguration: new Dictionary() + { + ["listen-address"] = JsonSerializer.SerializeToElement("0.0.0.0"), + ["template"] = JsonSerializer.SerializeToElement("docker"), + ["command"] = JsonSerializer.SerializeToElement(command), + ["git-url"] = JsonSerializer.SerializeToElement($"https://github.com/malstroem-labs/nexus-remoting-template-{satelliteId}"), + ["git-tag"] = JsonSerializer.SerializeToElement(version) + }, + RequestConfiguration: default + ); } } diff --git a/tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj b/tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj index b772e95..5bd68a8 100644 --- a/tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj +++ b/tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj @@ -6,11 +6,11 @@ - + - + diff --git a/version.json b/version.json index 78a3859..d5c1679 100644 --- a/version.json +++ b/version.json @@ -1,4 +1,4 @@ { "version": "2.0.0", - "suffix": "beta.2" + "suffix": "beta.24" } \ No newline at end of file