From e7cce580668fa806d3d9df2236fa9e134f08c060 Mon Sep 17 00:00:00 2001 From: Skipcast Date: Sun, 16 Jan 2022 23:35:29 +0100 Subject: [PATCH 1/2] - Steam messages are handled async again and game related code is ran on game thread and can be awaited - Fixed a freeze crash when player state handler and transmitter executed at the same time --- .../Game/Components/PlayerStateTransmitter.cs | 7 +---- JKMP.Plugin.Multiplayer/MultiplayerPlugin.cs | 1 + .../Handlers/DisconnectedMessageHandler.cs | 6 ++-- .../Handlers/HandshakeRequestHandler.cs | 30 +++++++++++-------- .../Handlers/HandshakeResponseHandler.cs | 8 +++-- .../Handlers/LocalChatMessageHandler.cs | 5 +++- .../Handlers/PlayerStateChangedHandler.cs | 17 ++++++----- .../Networking/P2PManager.cs | 18 ++++++----- JKMP.Plugin.Multiplayer/Steam/SteamManager.cs | 2 +- JKMP.Plugin.Multiplayer/Threading/Mutex.cs | 16 ++++++++++ 10 files changed, 71 insertions(+), 39 deletions(-) diff --git a/JKMP.Plugin.Multiplayer/Game/Components/PlayerStateTransmitter.cs b/JKMP.Plugin.Multiplayer/Game/Components/PlayerStateTransmitter.cs index fb7569c..27689a1 100644 --- a/JKMP.Plugin.Multiplayer/Game/Components/PlayerStateTransmitter.cs +++ b/JKMP.Plugin.Multiplayer/Game/Components/PlayerStateTransmitter.cs @@ -77,12 +77,7 @@ protected override void LateUpdate(float delta) private void SendState() { if (body == null || listener == null) - throw new InvalidOperationException("SendTranmission was called before component was initialized"); - - // Only send message if the connected players mutex isn't locked (prevents potential lag spikes) - // Note that there's still a chance it will be locked after this check and before the broadcast happens, but it should (hopefully) lower lag spikes significantly - if (p2p.ConnectedPlayersMtx.IsLocked) - return; + throw new InvalidOperationException("SendState was called before component was initialized"); var surfaceType = listener.CurrentSurfaceType; bool wearingShoes = SkinManager.IsWearingSkin(Items.Shoes); diff --git a/JKMP.Plugin.Multiplayer/MultiplayerPlugin.cs b/JKMP.Plugin.Multiplayer/MultiplayerPlugin.cs index ec6663e..2b4f0c1 100644 --- a/JKMP.Plugin.Multiplayer/MultiplayerPlugin.cs +++ b/JKMP.Plugin.Multiplayer/MultiplayerPlugin.cs @@ -113,6 +113,7 @@ public override void Initialize() GameEvents.GameUpdate += gameTime => { SteamClient.RunCallbacks(); + InputManager.Update(gameTime); }; diff --git a/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/DisconnectedMessageHandler.cs b/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/DisconnectedMessageHandler.cs index c156bac..1b16c27 100644 --- a/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/DisconnectedMessageHandler.cs +++ b/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/DisconnectedMessageHandler.cs @@ -7,8 +7,10 @@ internal class DisconnectedMessageHandler : IMessageHandler + { + context.P2PManager.Disconnect(message.Sender); + }); } } } \ No newline at end of file diff --git a/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/HandshakeRequestHandler.cs b/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/HandshakeRequestHandler.cs index a4c48b2..2a8cfdd 100644 --- a/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/HandshakeRequestHandler.cs +++ b/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/HandshakeRequestHandler.cs @@ -12,7 +12,7 @@ internal class HandshakeRequestHandler : IMessageHandler(); - public Task HandleMessage(HandshakeRequest message, Context context) + public async Task HandleMessage(HandshakeRequest message, Context context) { var authResult = SteamUser.BeginAuthSession(message.AuthSessionTicket, message.Sender); @@ -26,25 +26,31 @@ public Task HandleMessage(HandshakeRequest message, Context context) ErrorMessage = $"Steam auth result = {authResult}" }); - context.P2PManager.Disconnect(message.Sender); - return Task.CompletedTask; + await context.P2PManager.ExecuteOnGameThread(() => + { + context.P2PManager.Disconnect(message.Sender); + }); + return; } - var plrListener = EntityManager.instance.Find().PlayerListener; - var playerState = new PlayerStateChanged - { - Position = plrListener.Position, - State = plrListener.CurrentState, - WalkDirection = (sbyte)plrListener.WalkDirection - }; + PlayerStateChanged? playerState = null; + await context.P2PManager.ExecuteOnGameThread(() => + { + var plrListener = EntityManager.instance.Find().PlayerListener; + playerState = new PlayerStateChanged + { + Position = plrListener.Position, + State = plrListener.CurrentState, + WalkDirection = (sbyte)plrListener.WalkDirection + }; + }); + context.Messages.Send(message.Sender, new HandshakeResponse { Success = true, PlayerState = playerState }); - - return Task.CompletedTask; } } } \ No newline at end of file diff --git a/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/HandshakeResponseHandler.cs b/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/HandshakeResponseHandler.cs index b0e766a..e419fa6 100644 --- a/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/HandshakeResponseHandler.cs +++ b/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/HandshakeResponseHandler.cs @@ -26,8 +26,12 @@ public async Task HandleMessage(HandshakeResponse message, Context context) throw new NotImplementedException("User info is null, this shouldn't happen"); using var guard = await context.P2PManager.ConnectedPlayersMtx.LockAsync(); - RemotePlayer player = guard.Value[message.Sender]; - player.InitializeFromHandshakeResponse(message, userInfo.Value); + + await context.P2PManager.ExecuteOnGameThread(() => + { + RemotePlayer player = guard.Value[message.Sender]; + player.InitializeFromHandshakeResponse(message, userInfo.Value); + }); } } } diff --git a/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/LocalChatMessageHandler.cs b/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/LocalChatMessageHandler.cs index 4033383..8698136 100644 --- a/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/LocalChatMessageHandler.cs +++ b/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/LocalChatMessageHandler.cs @@ -41,7 +41,10 @@ public async Task HandleMessage(LocalChatMessage message, Context context) Logger.Information("[{channel}, {senderId}] {senderName}: {message}", ChatChannel.Local, message.Sender, senderName, trimmedMessage); - context.P2PManager.Events.OnIncomingChatMessage(new ChatMessage(ChatChannel.Local, message.Sender.Value, senderName, trimmedMessage)); + await context.P2PManager.ExecuteOnGameThread(() => + { + context.P2PManager.Events.OnIncomingChatMessage(new ChatMessage(ChatChannel.Local, message.Sender.Value, senderName, trimmedMessage)); + }); } } } \ No newline at end of file diff --git a/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/PlayerStateChangedHandler.cs b/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/PlayerStateChangedHandler.cs index 5840896..4a0dc02 100644 --- a/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/PlayerStateChangedHandler.cs +++ b/JKMP.Plugin.Multiplayer/Networking/Messages/Handlers/PlayerStateChangedHandler.cs @@ -11,15 +11,18 @@ internal class PlayerStateChangedHandler : IMessageHandler + { + using var guard = context.P2PManager.ConnectedPlayersMtx.Lock(); - if (!guard.Value.TryGetValue(message.Sender, out var player)) - return; + if (!guard.Value.TryGetValue(message.Sender, out var player)) + return; - if (player.State != PlayerNetworkState.Connected) - return; - - player.UpdateFromState(message); + if (player.State != PlayerNetworkState.Connected) + return; + + player.UpdateFromState(message); + }); } } } \ No newline at end of file diff --git a/JKMP.Plugin.Multiplayer/Networking/P2PManager.cs b/JKMP.Plugin.Multiplayer/Networking/P2PManager.cs index cfcbb1f..63b049a 100644 --- a/JKMP.Plugin.Multiplayer/Networking/P2PManager.cs +++ b/JKMP.Plugin.Multiplayer/Networking/P2PManager.cs @@ -27,7 +27,7 @@ public class P2PManager : IDisposable private static readonly ILogger Logger = LogManager.CreateLogger(); - private readonly Queue pendingGameThreadActions = new(); + private readonly Queue<(Action action, TaskCompletionSource tcs)> pendingGameThreadActions = new(); private readonly HashSet<(DateTime, SteamId)> recentlyDisconnectedPeers = new(); @@ -177,7 +177,7 @@ private async Task ProcessMessages() if (message is not PlayerStateChanged) Logger.Verbose("Incoming message {message}", message.GetType().Name); - processor.PushMessage(message, context); + await processor.HandleMessage(message, context); } Logger.Verbose("Finished message processing"); @@ -190,19 +190,23 @@ private async Task ProcessMessages() } /// - /// Executes the action on the game thread on the next update. + /// Executes the action on the game thread on the next update and waits for it to be executed. /// - public void ExecuteOnGameThread(Action action) + public async Task ExecuteOnGameThread(Action action) { if (action == null) throw new ArgumentNullException(nameof(action)); - pendingGameThreadActions.Enqueue(action); + var tcs = new TaskCompletionSource(false); + pendingGameThreadActions.Enqueue((action, tcs)); + await tcs.Task; } public void Update(float delta) { while (pendingGameThreadActions.Count > 0) { - pendingGameThreadActions.Dequeue()(); + (Action action, TaskCompletionSource tcs) = pendingGameThreadActions.Dequeue(); + action(); + tcs.SetResult(true); } if (recentlyDisconnectedPeers.Count > 0) @@ -220,8 +224,6 @@ public void Update(float delta) recentlyDisconnectedPeers.RemoveWhere(item => toRemove.Contains(item)); } - - processor.HandlePendingMessages().Wait(); } internal void Broadcast(GameMessage message, P2PSend sendType = P2PSend.Reliable) => BroadcastAsync(message, sendType).Wait(); diff --git a/JKMP.Plugin.Multiplayer/Steam/SteamManager.cs b/JKMP.Plugin.Multiplayer/Steam/SteamManager.cs index 20c261f..15d73da 100644 --- a/JKMP.Plugin.Multiplayer/Steam/SteamManager.cs +++ b/JKMP.Plugin.Multiplayer/Steam/SteamManager.cs @@ -15,7 +15,7 @@ public static bool InitializeSteam() { try { - SteamClient.Init(1061090, true); + SteamClient.Init(1061090, asyncCallbacks: false); } catch (Exception ex) { diff --git a/JKMP.Plugin.Multiplayer/Threading/Mutex.cs b/JKMP.Plugin.Multiplayer/Threading/Mutex.cs index 3b9b752..ca8321b 100644 --- a/JKMP.Plugin.Multiplayer/Threading/Mutex.cs +++ b/JKMP.Plugin.Multiplayer/Threading/Mutex.cs @@ -14,6 +14,8 @@ public class Mutex { private T value; private readonly SemaphoreSlim semaphore; + + private StackTrace? lastLockTrace; public Mutex(T value) { @@ -25,13 +27,27 @@ public Mutex(T value) public async Task> LockAsync() { + if (semaphore.CurrentCount == 0) + { + LogManager.TempLogger.Verbose("last: {stackTrace}", lastLockTrace); + LogManager.TempLogger.Verbose("new: {stackTrace}", new StackTrace().ToString()); + } + await semaphore.WaitAsync(); + lastLockTrace = new(); return new MutexGuard(semaphore, value); } public MutexGuard Lock() { + if (semaphore.CurrentCount == 0) + { + LogManager.TempLogger.Verbose("last: {stackTrace}", lastLockTrace); + LogManager.TempLogger.Verbose("new: {stackTrace}", new StackTrace().ToString()); + } + semaphore.Wait(); + lastLockTrace = new(); return new MutexGuard(semaphore, value); } From 469566c0e48c14edd673a2490828245c5feecded Mon Sep 17 00:00:00 2001 From: Skipcast Date: Sun, 16 Jan 2022 23:38:56 +0100 Subject: [PATCH 2/2] Fixed p2p related crash when multiple messages are received in a short amount of time --- JKMP.Plugin.Multiplayer/Networking/Framed.cs | 25 ++++++--- .../Networking/ReusableTCS.cs | 52 ------------------- 2 files changed, 19 insertions(+), 58 deletions(-) delete mode 100644 JKMP.Plugin.Multiplayer/Networking/ReusableTCS.cs diff --git a/JKMP.Plugin.Multiplayer/Networking/Framed.cs b/JKMP.Plugin.Multiplayer/Networking/Framed.cs index 3338b83..5262194 100644 --- a/JKMP.Plugin.Multiplayer/Networking/Framed.cs +++ b/JKMP.Plugin.Multiplayer/Networking/Framed.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading; @@ -16,7 +17,8 @@ namespace JKMP.Plugin.Multiplayer.Networking { private bool isRunning = true; - private readonly ReusableTCS tcs = new(); + private TaskCompletionSource? tcs = new(); + private readonly Queue queuedMessages = new(); private readonly TCodec codec; private readonly byte[] sendBuffer = new byte[4096]; @@ -54,8 +56,9 @@ private async Task StartPolling() Logger.Error(ex, "An unhandled exception was raised when decoding message from {steamId}", packet.SteamId); continue; } - - await tcs.SetResult(message); + + queuedMessages.Enqueue(message); + tcs?.TrySetResult(true); } await Task.Delay(33).ConfigureAwait(false); // poll around 30 times per second @@ -73,9 +76,19 @@ private async Task StartPolling() if (!isRunning) return default; - await tcs; - TData? result = tcs.GetResult(); - tcs.Reset(); + if (tcs != null) + { + await tcs.Task; + tcs = null; + } + + TData result = queuedMessages.Dequeue(); + + if (queuedMessages.Count == 0) + { + tcs = new(); + } + return result; } diff --git a/JKMP.Plugin.Multiplayer/Networking/ReusableTCS.cs b/JKMP.Plugin.Multiplayer/Networking/ReusableTCS.cs deleted file mode 100644 index 93ca910..0000000 --- a/JKMP.Plugin.Multiplayer/Networking/ReusableTCS.cs +++ /dev/null @@ -1,52 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Runtime.CompilerServices; -using System.Threading.Tasks; - -namespace JKMP.Plugin.Multiplayer.Networking -{ - /// - /// A reusable task completion source - /// - internal class ReusableTCS : INotifyCompletion - { - private bool isCompleted; - private T? result; - - public bool IsCompleted => isCompleted; - - private Action? onCompleted; - - public void OnCompleted(Action continuation) - { - if (onCompleted != null) - throw new NotSupportedException("Multiple awaiters is not supported"); - - onCompleted = continuation; - } - - public void Reset() - { - isCompleted = false; - result = default; - onCompleted = null; - } - - public async Task SetResult(T? result) - { - if (isCompleted) - throw new InvalidOperationException("Result is already set. Make sure to call Reset before calling SetResult again."); - - this.result = result; - isCompleted = true; - - if (onCompleted != null) - await Task.Run(onCompleted).ConfigureAwait(false); - } - - public ReusableTCS GetAwaiter() => this; - - public T? GetResult() => result; - } -} \ No newline at end of file