Skip to content

Commit

Permalink
Cleanup a few websocket things (#3625)
Browse files Browse the repository at this point in the history
  • Loading branch information
nirinchev authored Jun 24, 2024
1 parent be05efc commit 925b373
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 105 deletions.
1 change: 0 additions & 1 deletion Realm/Realm/Native/PrimitiveValue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

using System;
using System.Buffers;
using System.Collections;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.InteropServices;
Expand Down
44 changes: 12 additions & 32 deletions Realm/Realm/Native/SyncSocketProvider.EventLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
////////////////////////////////////////////////////////////////////////////

using System;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -48,56 +47,37 @@ internal void Cancel()
_cts.Dispose();
}

private class Work : IWork
private class Work(IntPtr nativeCallback, CancellationToken cancellationToken)
: IWork
{
private readonly IntPtr _nativeCallback;
private readonly CancellationToken _cancellationToken;

public Work(IntPtr nativeCallback, CancellationToken cancellationToken)
{
_nativeCallback = nativeCallback;
_cancellationToken = cancellationToken;
}

public void Execute()
{
var status = Status.OK;
if (_cancellationToken.IsCancellationRequested)
if (cancellationToken.IsCancellationRequested)
{
status = new(ErrorCode.OperationAborted, "Timer canceled");
}

RunCallback(_nativeCallback, status);
RunCallback(nativeCallback, status);
}
}
}

private class EventLoopWork : IWork
// Belongs to SyncSocketProvider. When Native destroys the Provider we need to stop executing
// enqueued work, but we need to release all the callbacks we copied on the heap.
private class EventLoopWork(IntPtr nativeCallback, CancellationToken cancellationToken)
: IWork
{
private readonly IntPtr _nativeCallback;
private readonly Status _status;

// Belongs to SyncSocketProvider. When Native destroys the Provider we need to stop executing
// enqueued work, but we need to release all the callbacks we copied on the heap.
private readonly CancellationToken _cancellationToken;

public EventLoopWork(IntPtr nativeCallback, Status status, CancellationToken cancellationToken)
{
_nativeCallback = nativeCallback;
_status = status;
_cancellationToken = cancellationToken;
}

public void Execute()
{
if (_cancellationToken.IsCancellationRequested)
if (cancellationToken.IsCancellationRequested)
{
Logger.LogDefault(LogLevel.Trace, "Deleting EventLoopWork callback only because event loop was cancelled.");
NativeMethods.delete_callback(_nativeCallback);
NativeMethods.delete_callback(nativeCallback);
return;
}

RunCallback(_nativeCallback, _status);
RunCallback(nativeCallback, Status.OK);
}
}

Expand All @@ -112,7 +92,7 @@ private static void RunCallback(IntPtr nativeCallback, Status status)
private async Task PostWorkAsync(IntPtr nativeCallback)
{
Logger.LogDefault(LogLevel.Trace, "Posting work to SyncSocketProvider event loop.");
await _workQueue.Writer.WriteAsync(new EventLoopWork(nativeCallback, Status.OK, _cts.Token));
await _workQueue.Writer.WriteAsync(new EventLoopWork(nativeCallback, _cts.Token));
}

private async partial Task WorkThread()
Expand Down
76 changes: 21 additions & 55 deletions Realm/Realm/Native/SyncSocketProvider.WebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
using System.Buffers;
using System.IO;
using System.Net.WebSockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Channels;
Expand Down Expand Up @@ -68,7 +67,7 @@ private async Task ReadThread()
{
var builder = new StringBuilder();
FormatExceptionForLogging(e, builder);
Logger.LogDefault(LogLevel.Error, "Error establishing WebSocket connection " + builder.ToString());
Logger.LogDefault(LogLevel.Error, $"Error establishing WebSocket connection {builder}");

await _workQueue.WriteAsync(new WebSocketClosedWork(false, (WebSocketCloseStatus)RLM_ERR_WEBSOCKET_CONNECTION_FAILED, e.Message, _observer, _cancellationToken));
return;
Expand All @@ -86,9 +85,9 @@ private async Task ReadThread()
await _receiveBuffer.WriteAsync(buffer, 0, result.Count);
if (result.EndOfMessage)
{
var current_buffer = _receiveBuffer;
var currentBuffer = _receiveBuffer;
_receiveBuffer = new MemoryStream();
await _workQueue.WriteAsync(new BinaryMessageReceivedWork(current_buffer, _observer, _cancellationToken));
await _workQueue.WriteAsync(new BinaryMessageReceivedWork(currentBuffer, _observer, _cancellationToken));
}

break;
Expand All @@ -105,7 +104,7 @@ private async Task ReadThread()
{
var builder = new StringBuilder();
FormatExceptionForLogging(e, builder);
Logger.LogDefault(LogLevel.Error, "Error reading from WebSocket " + builder.ToString());
Logger.LogDefault(LogLevel.Error, $"Error reading from WebSocket {builder}");

await _workQueue.WriteAsync(new WebSocketClosedWork(false, (WebSocketCloseStatus)RLM_ERR_WEBSOCKET_READ_ERROR, e.Message, _observer, _cancellationToken));
return;
Expand All @@ -123,7 +122,6 @@ public async void Write(BinaryValue data, IntPtr native_callback)

var buffer = data.AsBytes(usePooledArray: true);

var status = Status.OK;
try
{
await _webSocket.SendAsync(new(buffer), WebSocketMessageType.Binary, true, _cancellationToken);
Expand All @@ -132,7 +130,7 @@ public async void Write(BinaryValue data, IntPtr native_callback)
{
var builder = new StringBuilder();
FormatExceptionForLogging(e, builder);
Logger.LogDefault(LogLevel.Error, "Error writing to WebSocket " + builder.ToString());
Logger.LogDefault(LogLevel.Error, $"Error writing to WebSocket {builder}");

// in case of errors notify the websocket observer and just dispose the callback
await _workQueue.WriteAsync(new WebSocketClosedWork(false, (WebSocketCloseStatus)RLM_ERR_WEBSOCKET_WRITE_ERROR, e.Message, _observer, _cancellationToken));
Expand All @@ -144,7 +142,7 @@ public async void Write(BinaryValue data, IntPtr native_callback)
ArrayPool<byte>.Shared.Return(buffer);
}

await _workQueue.WriteAsync(new EventLoopWork(native_callback, status, _cancellationToken));
await _workQueue.WriteAsync(new EventLoopWork(native_callback, _cancellationToken));
}

public async void Dispose()
Expand Down Expand Up @@ -201,98 +199,66 @@ private static void FormatExceptionForLogging(Exception ex, StringBuilder builde
FormatExceptionForLogging(inner, builder, nesting + 1);
}
}
else if (ex.InnerException is Exception inner)
else if (ex.InnerException is { } inner)
{
FormatExceptionForLogging(inner, builder, nesting + 1);
}
}
}

private abstract class WebSocketWork : IWork
private abstract class WebSocketWork(IntPtr observer, CancellationToken cancellationToken)
: IWork
{
private readonly IntPtr _observer;

// Belongs to the Socket and canceled when Native destroys the socket.
// If it's canceled we shouldn't call any observer methods.
private readonly CancellationToken _cancellationToken;

protected WebSocketWork(IntPtr observer, CancellationToken cancellationToken)
{
_observer = observer;
_cancellationToken = cancellationToken;
}
private readonly CancellationToken _cancellationToken = cancellationToken;

protected abstract void Execute(IntPtr observer);

void IWork.Execute()
{
if (!_cancellationToken.IsCancellationRequested)
{
Execute(_observer);
Execute(observer);
}
}
}

private sealed class WebSocketConnectedWork : WebSocketWork
private sealed class WebSocketConnectedWork(string protocol, IntPtr observer, CancellationToken cancellationToken)
: WebSocketWork(observer, cancellationToken)
{
private readonly string _protocol;

public WebSocketConnectedWork(string protocol, IntPtr observer, CancellationToken cancellationToken)
: base(observer, cancellationToken)
{
_protocol = protocol;
}

protected override void Execute(IntPtr observer)
{
using var arena = new Arena();
NativeMethods.observer_connected_handler(observer, StringValue.AllocateFrom(_protocol, arena));
NativeMethods.observer_connected_handler(observer, StringValue.AllocateFrom(protocol, arena));
}
}

private sealed class BinaryMessageReceivedWork : WebSocketWork
private sealed class BinaryMessageReceivedWork(MemoryStream receiveBuffer, IntPtr observer, CancellationToken cancellationToken)
: WebSocketWork(observer, cancellationToken)
{
private readonly MemoryStream _receiveBuffer;

public BinaryMessageReceivedWork(MemoryStream receiveBuffer, IntPtr observer, CancellationToken cancellationToken)
: base(observer, cancellationToken)
{
_receiveBuffer = receiveBuffer;
}

protected unsafe override void Execute(IntPtr observer)
{
using var buffer = _receiveBuffer;
using var buffer = receiveBuffer;
fixed (byte* data = buffer.GetBuffer())
{
NativeMethods.observer_binary_message_received(observer, new() { data = data, size = (IntPtr)buffer.Length });
}
}
}

private sealed class WebSocketClosedWork : WebSocketWork
private sealed class WebSocketClosedWork(bool clean, WebSocketCloseStatus status, string description, IntPtr observer, CancellationToken cancellationToken)
: WebSocketWork(observer, cancellationToken)
{
private readonly bool _clean;
private readonly WebSocketCloseStatus _status;
private readonly string _description;

public WebSocketClosedWork(bool clean, WebSocketCloseStatus status, string description, IntPtr observer, CancellationToken cancellationToken)
: base(observer, cancellationToken)
{
_clean = clean;
_status = status;
_description = description;
}

protected override void Execute(IntPtr observer)
{
if (!_clean)
if (!clean)
{
NativeMethods.observer_error_handler(observer);
}

using var arena = new Arena();
NativeMethods.observer_closed_handler(observer, _clean, _status, StringValue.AllocateFrom(_description, arena));
NativeMethods.observer_closed_handler(observer, clean, status, StringValue.AllocateFrom(description, arena));
}
}
}
30 changes: 13 additions & 17 deletions Realm/Realm/Native/SyncSocketProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,18 @@ private static IntPtr WebSocketConnect(IntPtr managed_provider, IntPtr observer,

provider._onWebSocketConnection?.Invoke(webSocket.Options);

var builder = new UriBuilder();
builder.Scheme = endpoint.is_ssl ? "wss" : "ws";
builder.Host = endpoint.address;
builder.Port = endpoint.port;
var builder = new UriBuilder
{
Scheme = endpoint.is_ssl ? "wss" : "ws",
Host = endpoint.address!,
Port = endpoint.port
};

if (endpoint.path)
{
var pathAndQuery = ((string)endpoint.path)!.Split('?');
builder.Path = pathAndQuery.ElementAtOrDefault(0);
builder.Query = pathAndQuery.ElementAtOrDefault(1);
builder.Path = pathAndQuery.ElementAtOrDefault(0) ?? string.Empty;
builder.Query = pathAndQuery.ElementAtOrDefault(1) ?? String.Empty;
}

var socket = new Socket(webSocket, observer, provider._workQueue, builder.Uri);
Expand Down Expand Up @@ -128,18 +131,11 @@ static SyncSocketProvider()
NativeMethods.install_callbacks(post, dispose, create_timer, cancel_timer, websocket_connect, websocket_write, websocket_close);
}

private struct Status
private struct Status(ErrorCode code, string? reason)
{
internal ErrorCode Code;
internal string? Reason;
internal static readonly Status OperationAborted = new(ErrorCode.OperationAborted, "Operation canceled");
internal static readonly Status OK = new() { Code = ErrorCode.Ok };

public Status(ErrorCode code, string reason)
{
Code = code;
Reason = reason;
}
internal readonly string? Reason = reason;
internal readonly ErrorCode Code = code;
internal static readonly Status OK = new(ErrorCode.Ok, null);
}

/// <summary>
Expand Down

0 comments on commit 925b373

Please sign in to comment.