Skip to content

Commit

Permalink
加回 .NET Framework 的支持
Browse files Browse the repository at this point in the history
  • Loading branch information
lindexi committed Sep 13, 2023
1 parent 705fcaa commit 053851b
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#if NET6_0_OR_GREATER
using System.IO;
using System.IO;
using System.Text;
using dotnetCampus.Ipc.Messages;

Expand All @@ -23,4 +22,3 @@ protected IpcMessage ToIpcMessage(MemoryStream stream, string tag = "")
return new IpcMessage(tag, new IpcMessageBody(buffer, start: 0, length));
}
}
#endif
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#if NET6_0_OR_GREATER
using System;
using System;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Text;
Expand Down Expand Up @@ -140,17 +139,27 @@ private bool TryHandleMessage(in IpcMessage ipcMessage, [NotNullWhen(true)] out
return false;
}

protected readonly record struct IpcDirectRoutedMessage(string RoutedPath, MemoryStream Stream,
IpcMessage PayloadIpcMessage)
protected readonly struct IpcDirectRoutedMessage
{
public IpcDirectRoutedMessage(string routedPath, MemoryStream stream,
IpcMessage payloadIpcMessage)
{
this.RoutedPath = routedPath;
this.Stream = stream;
this.PayloadIpcMessage = payloadIpcMessage;
}

public IpcMessageBody GetData()
{
var position = (int) Stream.Position;
var payload = PayloadIpcMessage.Body;
var data = new IpcMessageBody(payload.Buffer, payload.Start + position, payload.Length - position);
return data;
}

public string RoutedPath { get; }
public MemoryStream Stream { get; }
public IpcMessage PayloadIpcMessage { get; }
}
}

#endif
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#if NET6_0_OR_GREATER
using System.IO;
using System.IO;

namespace dotnetCampus.Ipc.IpcRouteds.DirectRouteds;

Expand All @@ -11,4 +10,3 @@ public static void WriteHeader(BinaryWriter writer, ulong businessMessageHeader,
writer.Write(routedPath);
}
}
#endif
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#if NET6_0_OR_GREATER
using System.IO;
using System.IO;
using System.Text;
using System.Threading.Tasks;

Expand Down Expand Up @@ -33,7 +32,16 @@ public Task NotifyAsync<T>(string routedPath, T obj) where T : class
var responseMessage = await _peerProxy.GetResponseAsync(ipcMessage);

using var memoryStream = new MemoryStream(responseMessage.Body.Buffer, responseMessage.Body.Start, responseMessage.Body.Length, writable: false);
using StreamReader reader = new StreamReader(memoryStream, leaveOpen: true);
using StreamReader reader = new StreamReader
(
memoryStream,
#if !NETCOREAPP
Encoding.UTF8,
detectEncodingFromByteOrderMarks: false,
bufferSize: 2048,
#endif
leaveOpen: true
);
JsonReader jsonReader = new JsonTextReader(reader);
return JsonSerializer.Deserialize<TResponse>(jsonReader);
}
Expand All @@ -43,7 +51,7 @@ private IpcMessage BuildMessage(string routedPath, object obj)
using var memoryStream = new MemoryStream();
WriteHeader(memoryStream, routedPath);

using (var textWriter = new StreamWriter(memoryStream, Encoding.UTF8, leaveOpen: true))
using (var textWriter = new StreamWriter(memoryStream, Encoding.UTF8, leaveOpen: true, bufferSize: 2048))
{
JsonSerializer.Serialize(textWriter, obj);
}
Expand All @@ -53,4 +61,3 @@ private IpcMessage BuildMessage(string routedPath, object obj)

protected override ulong BusinessHeader => (ulong) KnownMessageHeaders.JsonIpcDirectRoutedMessageHeader;
}
#endif
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#if NET6_0_OR_GREATER
namespace dotnetCampus.Ipc.IpcRouteds.DirectRouteds;
namespace dotnetCampus.Ipc.IpcRouteds.DirectRouteds;

public record JsonIpcDirectRoutedContext(string PeerName);
#endif
public class JsonIpcDirectRoutedContext
{
public JsonIpcDirectRoutedContext(string peerName)
{
this.PeerName = peerName;
}

public string PeerName { get; }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#if NET6_0_OR_GREATER
using System;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
Expand All @@ -16,6 +15,10 @@

using Newtonsoft.Json;

#if !NETCOREAPP
using ValueTask = System.Threading.Tasks.Task;
#endif

namespace dotnetCampus.Ipc.IpcRouteds.DirectRouteds;

/// <summary>
Expand Down Expand Up @@ -133,7 +136,8 @@ void HandleNotify(MemoryStream stream, JsonIpcDirectRoutedContext context)
protected override void OnHandleNotify(IpcDirectRoutedMessage message, PeerMessageArgs e)
{
// 接下来进行调度
var (routedPath, stream, _) = message;
var routedPath = message.RoutedPath;
var stream = message.Stream;
if (HandleNotifyDictionary.TryGetValue(routedPath, out var handleNotify))
{
var context = new JsonIpcDirectRoutedContext(e.PeerName);
Expand Down Expand Up @@ -228,12 +232,26 @@ public void AddRequestHandler<TRequest, TResponse>(string routedPath,
throw new InvalidOperationException($"重复添加对 {routedPath} 的处理");
}

#if NETCOREAPP
async ValueTask<IpcMessage> HandleRequest(MemoryStream stream, JsonIpcDirectRoutedContext context)
#else
async Task<IpcMessage> HandleRequest(MemoryStream stream, JsonIpcDirectRoutedContext context)
#endif
{
var argument = ToObject<TRequest>(stream);
var response = await handler(argument!, context);
var responseMemoryStream = new MemoryStream();
using (TextWriter textWriter = new StreamWriter(responseMemoryStream, Encoding.UTF8, leaveOpen: true))
using
(
TextWriter textWriter = new StreamWriter
(
responseMemoryStream, Encoding.UTF8
#if !NETCOREAPP
, bufferSize: 2048
#endif
, leaveOpen: true
)
)
{
JsonSerializer.Serialize(textWriter, response);
}
Expand All @@ -244,14 +262,19 @@ async ValueTask<IpcMessage> HandleRequest(MemoryStream stream, JsonIpcDirectRout
}
}

#if NETCOREAPP
private delegate ValueTask<IpcMessage> HandleRequest(MemoryStream stream, JsonIpcDirectRoutedContext context);
#else
private delegate Task<IpcMessage> HandleRequest(MemoryStream stream, JsonIpcDirectRoutedContext context);
#endif

private ConcurrentDictionary<string, HandleRequest> HandleRequestDictionary { get; } =
new ConcurrentDictionary<string, HandleRequest>();

protected override async Task<IIpcResponseMessage> OnHandleRequestAsync(IpcDirectRoutedMessage message, IIpcRequestContext requestContext)
{
var (routedPath, stream, _) = message;
var routedPath = message.RoutedPath;
var stream = message.Stream;

if (HandleRequestDictionary.TryGetValue(routedPath, out var handler))
{
Expand Down Expand Up @@ -284,18 +307,26 @@ protected override async Task<IIpcResponseMessage> OnHandleRequestAsync(IpcDirec
}
}

#endregion
#endregion

private JsonSerializer JsonSerializer => _jsonSerializer ??= JsonSerializer.CreateDefault();
private JsonSerializer? _jsonSerializer;
private ILogger Logger => IpcProvider.IpcContext.Logger;

Check warning on line 314 in src/dotnetCampus.Ipc/IpcRouteds/DirectRouteds/Json_/JsonIpcDirectRoutedProvider.cs

View workflow job for this annotation

GitHub Actions / build

'JsonIpcDirectRoutedProvider.Logger' hides inherited member 'IpcDirectRoutedProviderBase.Logger'. Use the new keyword if hiding was intended.

private T? ToObject<T>(MemoryStream stream)
{
using StreamReader reader = new StreamReader(stream, leaveOpen: true);
using StreamReader reader = new StreamReader
(
stream,
#if !NETCOREAPP
Encoding.UTF8,
bufferSize: 2048,
detectEncodingFromByteOrderMarks: false,
#endif
leaveOpen: true
);
JsonReader jsonReader = new JsonTextReader(reader);
return JsonSerializer.Deserialize<T>(jsonReader);
}
}

#endif
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#if NET6_0_OR_GREATER
using System;
using System;
using System.IO;
using System.Threading.Tasks;
using dotnetCampus.Ipc.Context;
Expand All @@ -19,6 +18,11 @@ public RawByteIpcDirectRoutedClientProxy(IPeerProxy peerProxy)

private readonly IPeerProxy _peerProxy;

#if NETCOREAPP

public Task NotfiyAsync(string routedPath, in IpcMessageBody data)
=> NotfiyAsync(routedPath, data.AsSpan());

public Task NotfiyAsync(string routedPath, Span<byte> data)
{
IpcMessage ipcMessage = BuildMessage(routedPath, data);
Expand All @@ -42,7 +46,30 @@ private IpcMessage BuildMessage(string routedPath, Span<byte> data)

return ToIpcMessage(memoryStream, $"Message To {routedPath}");
}
#else
public Task NotfiyAsync(string routedPath, IpcMessageBody data)
{
IpcMessage ipcMessage = BuildMessage(routedPath, data);
return _peerProxy.NotifyAsync(ipcMessage);
}

public async Task<IpcMessageBody> GetResponseAsync(string routedPath, IpcMessageBody ipcMessageBody)
{
IpcMessage ipcMessage = BuildMessage(routedPath, ipcMessageBody);

var response = await _peerProxy.GetResponseAsync(ipcMessage);
return response.Body;
}

private IpcMessage BuildMessage(string routedPath, in IpcMessageBody data)
{
using var memoryStream = new MemoryStream();
WriteHeader(memoryStream, routedPath);

memoryStream.Write(data.Buffer, data.Start, data.Length);

return ToIpcMessage(memoryStream, $"Message To {routedPath}");
}
#endif
protected override ulong BusinessHeader => (ulong) KnownMessageHeaders.RawByteIpcDirectRoutedMessageHeader;
}
#endif
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#if NET6_0_OR_GREATER
using System;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading.Tasks;
Expand Down Expand Up @@ -211,4 +210,3 @@ protected override async Task<IIpcResponseMessage> OnHandleRequestAsync(IpcDirec
}
}
}
#endif

0 comments on commit 053851b

Please sign in to comment.