Skip to content

Commit

Permalink
Follow Nexus changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Apollo3zehn committed Sep 26, 2023
1 parent 457d322 commit 0bb0928
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/Nexus.Sources.Remote/Nexus.Sources.Remote.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Nexus.Extensibility" Version="2.0.0-beta.8">
<PackageReference Include="Nexus.Extensibility" Version="2.0.0-beta.19">
<ExcludeAssets>runtime;native</ExcludeAssets>
</PackageReference>

Expand Down
37 changes: 33 additions & 4 deletions src/Nexus.Sources.Remote/Remote.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using Microsoft.Extensions.Logging;
using Nexus.DataModel;
using Nexus.Extensibility;
using System.Buffers;
using System.Net;
using System.Reflection;
using System.Runtime.InteropServices;
using System.Text.Json;
using System.Text.RegularExpressions;
Expand Down Expand Up @@ -91,7 +93,7 @@ public async Task SetContextAsync(
if (requestConfiguration.TryGetValue("environment-variables", out var propertyValue) &&
propertyValue.ValueKind == JsonValueKind.Object)
{
var environmentVariablesRaw = propertyValue .Deserialize<Dictionary<string, JsonElement>>();
var environmentVariablesRaw = propertyValue.Deserialize<Dictionary<string, JsonElement>>();

if (environmentVariablesRaw is not null)
environmentVariables = environmentVariablesRaw
Expand Down Expand Up @@ -246,18 +248,45 @@ private string BuildCommand(string templateId)
return command;
}

// copy from Nexus -> DataModelUtilities
private static readonly Regex _resourcePathEvaluator = new(@"^(?'catalog'.*)\/(?'resource'.*)\/(?'sample_period'[0-9]+_[a-zA-Z]+)(?:_(?'kind'[^\(#\s]+))?(?:\((?'parameters'.*)\))?(?:#(?'fragment'.*))?$", RegexOptions.Compiled);

private static MethodInfo _toSamplePeriodMethodInfo = typeof(DataModelExtensions)
.GetMethod("ToSamplePeriod", BindingFlags.Static | BindingFlags.NonPublic) ?? throw new Exception("Unable to locate ToSamplePeriod method.");

private async Task HandleReadDataAsync(string resourcePath, DateTime begin, DateTime end)
{
// copy of _readData handler
var localReadData = _readData;

if (localReadData is null)
throw new InvalidOperationException("Unable to read data without previous invocation of the ReadAsync method.");

// timeout token source
var timeoutTokenSource = GetTimeoutTokenSource(TimeSpan.FromMinutes(1));
var data = await localReadData(resourcePath, begin, end, timeoutTokenSource.Token);
var byteData = new CastMemoryManager<double, byte>(MemoryMarshal.AsMemory(data)).Memory;

await _communicator.WriteRawAsync(byteData, timeoutTokenSource.Token);
// find sample period
var match = _resourcePathEvaluator.Match(resourcePath);

if (!match.Success)
throw new Exception("Invalid resource path");

var samplePeriod = (TimeSpan)_toSamplePeriodMethodInfo.Invoke(null, new object[] {
match.Groups["sample_period"].Value
})!;

// find buffer length and rent buffer
var length = (int)((end - begin).Ticks / samplePeriod.Ticks);

using var memoryOwner = MemoryPool<double>.Shared.Rent(length);
var buffer = memoryOwner.Memory.Slice(0, length);

// read data
await localReadData(resourcePath, begin, end, buffer, timeoutTokenSource.Token);
var byteBuffer = new CastMemoryManager<double, byte>(buffer).Memory;

// write to communicator
await _communicator.WriteRawAsync(byteBuffer, timeoutTokenSource.Token);
}

private static CancellationTokenSource GetTimeoutTokenSource(TimeSpan timeout)
Expand Down
39 changes: 21 additions & 18 deletions src/remoting/dotnet-remoting/Remoting.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public RemoteCommunicator(IDataSource dataSource, string address, int port)
/// <returns></returns>
public async Task RunAsync()
{
static JsonElement Read(byte[] jsonRequest)
static JsonElement Read(Span<byte> jsonRequest)
{
var reader = new Utf8JsonReader(jsonRequest);
return JsonSerializer.Deserialize<JsonElement>(ref reader, Utilities.Options);
Expand All @@ -109,8 +109,12 @@ static JsonElement Read(byte[] jsonRequest)

// get request message
var size = ReadSize(_tcpCommSocketStream);
var jsonRequest = _tcpCommSocketStream.ReadExactly(size, _logger);
var request = Read(jsonRequest);

using var memoryOwner = MemoryPool<byte>.Shared.Rent(size);
var messageMemory = memoryOwner.Memory.Slice(0, size);

_tcpCommSocketStream.ReadExactly(messageMemory.Span, _logger);
var request = Read(messageMemory.Span);

// process message
Memory<byte> data = default;
Expand Down Expand Up @@ -334,10 +338,11 @@ await _dataSource.ReadAsync(
return (result, data, status);
}

private async Task<ReadOnlyMemory<double>> HandleReadDataAsync(
private async Task HandleReadDataAsync(
string resourcePath,
DateTime begin,
DateTime end,
Memory<double> buffer,
CancellationToken cancellationToken)
{
var readDataRequest = new JsonObject()
Expand All @@ -352,17 +357,20 @@ private async Task<ReadOnlyMemory<double>> HandleReadDataAsync(
await Utilities.SendToServerAsync(readDataRequest, _tcpCommSocketStream);

var size = ReadSize(_tcpDataSocketStream);

if (size != buffer.Length * sizeof(double))
throw new Exception("Data returned by Nexus have an unexpected length");

_logger.LogTrace("Try to read {ByteCount} bytes from Nexus", size);

var data = _tcpDataSocketStream.ReadExactly(size, _logger);

return new CastMemoryManager<byte, double>(data).Memory;
_tcpDataSocketStream.ReadExactly(MemoryMarshal.AsBytes(buffer.Span), _logger);
}

private int ReadSize(NetworkStream currentStream)
{
var sizeBuffer = currentStream.ReadExactly(4, _logger);
Array.Reverse(sizeBuffer);
Span<byte> sizeBuffer = stackalloc byte[4];
currentStream.ReadExactly(sizeBuffer, _logger);
MemoryExtensions.Reverse(sizeBuffer);

var size = BitConverter.ToInt32(sizeBuffer);
return size;
Expand Down Expand Up @@ -408,25 +416,20 @@ public static async Task SendToServerAsync(JsonNode response, NetworkStream curr

internal static class StreamExtensions
{
public static byte[] ReadExactly(this Stream stream, int count, ILogger logger)
public static void ReadExactly(this Stream stream, Span<byte> buffer, ILogger logger)
{
var buffer = new byte[count];
var offset = 0;

while (offset < count)
while (buffer.Length > 0)
{
var read = stream.Read(buffer, offset, count - offset);
var read = stream.Read(buffer);

if (read == 0)
{
logger.LogDebug("No data from Nexus received (exiting)");
Environment.Exit(0);
}

offset += read;
buffer = buffer.Slice(read);
}

return buffer;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/remoting/dotnet-remoting/dotnet-remoting.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Nexus.Extensibility" Version="2.0.0-beta.8" />
<PackageReference Include="Nexus.Extensibility" Version="2.0.0-beta.19" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.5.0" />
<PackageReference Include="Moq" Version="4.18.4" />
<PackageReference Include="Nexus.Extensibility" Version="2.0.0-beta.8" />
<PackageReference Include="Nexus.Extensibility" Version="2.0.0-beta.19" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
Expand Down
8 changes: 5 additions & 3 deletions tests/Nexus.Sources.Remote.Tests/RemoteTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,16 @@ public async Task CanReadDataHandler(string command)
.Select(value => (byte)1)
.ToArray();

Task<ReadOnlyMemory<double>> HandleReadDataAsync(string resourcePath, DateTime begin, DateTime end, CancellationToken cancellationToken)
Task HandleReadDataAsync(string resourcePath, DateTime begin, DateTime end, Memory<double> buffer, CancellationToken cancellationToken)
{
ReadOnlyMemory<double> data = Enumerable
var data = Enumerable
.Range(0, length)
.Select(value => (double)value)
.ToArray();

return Task.FromResult(data);
data.CopyTo(buffer);

return Task.CompletedTask;
}

var request = new ReadRequest(catalogItem, data, status);
Expand Down
8 changes: 4 additions & 4 deletions tests/Nexus.Sources.Remote.Tests/SetupDockerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ public async Task CanReadFullDay(string satelliteId, string command, string vers

expectedStatus.AsSpan().Fill(1);

Task<ReadOnlyMemory<double>> ReadData(string resourcePath, DateTime begin, DateTime end, CancellationToken cancellationToken)
Task ReadData(string resourcePath, DateTime begin, DateTime end, Memory<double> buffer, CancellationToken cancellationToken)
{
var buffer = new double[length];
var spanBuffer = buffer.Span;

for (int i = 0; i < length; i++)
{
buffer[i] = i;
spanBuffer[i] = i;
}

return Task.FromResult(new ReadOnlyMemory<double>(buffer));
return Task.CompletedTask;
}

var request = new ReadRequest(catalogItem, data, status);
Expand Down
2 changes: 1 addition & 1 deletion tests/Nexus.Sources.Remote.Tests/dotnet/remote.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Nexus.Extensibility" Version="2.0.0-beta.8" />
<PackageReference Include="Nexus.Extensibility" Version="2.0.0-beta.19" />
</ItemGroup>

<ItemGroup>
Expand Down

0 comments on commit 0bb0928

Please sign in to comment.