Skip to content

Commit

Permalink
Merge pull request #14 from Jump-King-Multiplayer/fix/p2p-async
Browse files Browse the repository at this point in the history
Fixed some bugs causing freezing and crashes and made p2p messages be handled asynchronously to prevent lag spikes
  • Loading branch information
Skippeh authored Jan 16, 2022
2 parents 93da891 + 469566c commit f82a250
Show file tree
Hide file tree
Showing 12 changed files with 90 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,7 @@ protected override void LateUpdate(float delta)
private void SendState()
{
if (body == null || listener == null)
throw new InvalidOperationException("SendTranmission was called before component was initialized");

// Only send message if the connected players mutex isn't locked (prevents potential lag spikes)
// Note that there's still a chance it will be locked after this check and before the broadcast happens, but it should (hopefully) lower lag spikes significantly
if (p2p.ConnectedPlayersMtx.IsLocked)
return;
throw new InvalidOperationException("SendState was called before component was initialized");

var surfaceType = listener.CurrentSurfaceType;
bool wearingShoes = SkinManager.IsWearingSkin(Items.Shoes);
Expand Down
1 change: 1 addition & 0 deletions JKMP.Plugin.Multiplayer/MultiplayerPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public override void Initialize()
GameEvents.GameUpdate += gameTime =>
{
SteamClient.RunCallbacks();

InputManager.Update(gameTime);
};

Expand Down
25 changes: 19 additions & 6 deletions JKMP.Plugin.Multiplayer/Networking/Framed.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
Expand All @@ -16,7 +17,8 @@ namespace JKMP.Plugin.Multiplayer.Networking
{
private bool isRunning = true;

private readonly ReusableTCS<TData?> tcs = new();
private TaskCompletionSource<bool>? tcs = new();
private readonly Queue<TData> queuedMessages = new();
private readonly TCodec codec;
private readonly byte[] sendBuffer = new byte[4096];

Expand Down Expand Up @@ -54,8 +56,9 @@ private async Task StartPolling()
Logger.Error(ex, "An unhandled exception was raised when decoding message from {steamId}", packet.SteamId);
continue;
}

await tcs.SetResult(message);

queuedMessages.Enqueue(message);
tcs?.TrySetResult(true);
}

await Task.Delay(33).ConfigureAwait(false); // poll around 30 times per second
Expand All @@ -73,9 +76,19 @@ private async Task StartPolling()
if (!isRunning)
return default;

await tcs;
TData? result = tcs.GetResult();
tcs.Reset();
if (tcs != null)
{
await tcs.Task;
tcs = null;
}

TData result = queuedMessages.Dequeue();

if (queuedMessages.Count == 0)
{
tcs = new();
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ internal class DisconnectedMessageHandler : IMessageHandler<Disconnected, Contex
{
public Task HandleMessage(Disconnected message, Context context)
{
context.P2PManager.Disconnect(message.Sender);
return Task.CompletedTask;
return context.P2PManager.ExecuteOnGameThread(() =>
{
context.P2PManager.Disconnect(message.Sender);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ internal class HandshakeRequestHandler : IMessageHandler<HandshakeRequest, Conte
{
private static readonly ILogger Logger = LogManager.CreateLogger<HandshakeRequestHandler>();

public Task HandleMessage(HandshakeRequest message, Context context)
public async Task HandleMessage(HandshakeRequest message, Context context)
{
var authResult = SteamUser.BeginAuthSession(message.AuthSessionTicket, message.Sender);

Expand All @@ -26,25 +26,31 @@ public Task HandleMessage(HandshakeRequest message, Context context)
ErrorMessage = $"Steam auth result = {authResult}"
});

context.P2PManager.Disconnect(message.Sender);
return Task.CompletedTask;
await context.P2PManager.ExecuteOnGameThread(() =>
{
context.P2PManager.Disconnect(message.Sender);
});
return;
}

var plrListener = EntityManager.instance.Find<GameEntity>().PlayerListener;
var playerState = new PlayerStateChanged
{
Position = plrListener.Position,
State = plrListener.CurrentState,
WalkDirection = (sbyte)plrListener.WalkDirection
};
PlayerStateChanged? playerState = null;

await context.P2PManager.ExecuteOnGameThread(() =>
{
var plrListener = EntityManager.instance.Find<GameEntity>().PlayerListener;
playerState = new PlayerStateChanged
{
Position = plrListener.Position,
State = plrListener.CurrentState,
WalkDirection = (sbyte)plrListener.WalkDirection
};
});

context.Messages.Send(message.Sender, new HandshakeResponse
{
Success = true,
PlayerState = playerState
});

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ public async Task HandleMessage(HandshakeResponse message, Context context)
throw new NotImplementedException("User info is null, this shouldn't happen");

using var guard = await context.P2PManager.ConnectedPlayersMtx.LockAsync();
RemotePlayer player = guard.Value[message.Sender];
player.InitializeFromHandshakeResponse(message, userInfo.Value);

await context.P2PManager.ExecuteOnGameThread(() =>
{
RemotePlayer player = guard.Value[message.Sender];
player.InitializeFromHandshakeResponse(message, userInfo.Value);
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ public async Task HandleMessage(LocalChatMessage message, Context context)

Logger.Information("[{channel}, {senderId}] {senderName}: {message}", ChatChannel.Local, message.Sender, senderName, trimmedMessage);

context.P2PManager.Events.OnIncomingChatMessage(new ChatMessage(ChatChannel.Local, message.Sender.Value, senderName, trimmedMessage));
await context.P2PManager.ExecuteOnGameThread(() =>
{
context.P2PManager.Events.OnIncomingChatMessage(new ChatMessage(ChatChannel.Local, message.Sender.Value, senderName, trimmedMessage));
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ internal class PlayerStateChangedHandler : IMessageHandler<PlayerStateChanged, C

public async Task HandleMessage(PlayerStateChanged message, Context context)
{
using var guard = await context.P2PManager.ConnectedPlayersMtx.LockAsync();
await context.P2PManager.ExecuteOnGameThread(() =>
{
using var guard = context.P2PManager.ConnectedPlayersMtx.Lock();

if (!guard.Value.TryGetValue(message.Sender, out var player))
return;
if (!guard.Value.TryGetValue(message.Sender, out var player))
return;

if (player.State != PlayerNetworkState.Connected)
return;

player.UpdateFromState(message);
if (player.State != PlayerNetworkState.Connected)
return;

player.UpdateFromState(message);
});
}
}
}
18 changes: 10 additions & 8 deletions JKMP.Plugin.Multiplayer/Networking/P2PManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class P2PManager : IDisposable

private static readonly ILogger Logger = LogManager.CreateLogger<P2PManager>();

private readonly Queue<Action> pendingGameThreadActions = new();
private readonly Queue<(Action action, TaskCompletionSource<bool> tcs)> pendingGameThreadActions = new();

private readonly HashSet<(DateTime, SteamId)> recentlyDisconnectedPeers = new();

Expand Down Expand Up @@ -177,7 +177,7 @@ private async Task ProcessMessages()
if (message is not PlayerStateChanged)
Logger.Verbose("Incoming message {message}", message.GetType().Name);

processor.PushMessage(message, context);
await processor.HandleMessage(message, context);
}

Logger.Verbose("Finished message processing");
Expand All @@ -190,19 +190,23 @@ private async Task ProcessMessages()
}

/// <summary>
/// Executes the action on the game thread on the next update.
/// Executes the action on the game thread on the next update and waits for it to be executed.
/// </summary>
public void ExecuteOnGameThread(Action action)
public async Task ExecuteOnGameThread(Action action)
{
if (action == null) throw new ArgumentNullException(nameof(action));
pendingGameThreadActions.Enqueue(action);
var tcs = new TaskCompletionSource<bool>(false);
pendingGameThreadActions.Enqueue((action, tcs));
await tcs.Task;
}

public void Update(float delta)
{
while (pendingGameThreadActions.Count > 0)
{
pendingGameThreadActions.Dequeue()();
(Action action, TaskCompletionSource<bool> tcs) = pendingGameThreadActions.Dequeue();
action();
tcs.SetResult(true);
}

if (recentlyDisconnectedPeers.Count > 0)
Expand All @@ -220,8 +224,6 @@ public void Update(float delta)

recentlyDisconnectedPeers.RemoveWhere(item => toRemove.Contains(item));
}

processor.HandlePendingMessages().Wait();
}

internal void Broadcast(GameMessage message, P2PSend sendType = P2PSend.Reliable) => BroadcastAsync(message, sendType).Wait();
Expand Down
52 changes: 0 additions & 52 deletions JKMP.Plugin.Multiplayer/Networking/ReusableTCS.cs

This file was deleted.

2 changes: 1 addition & 1 deletion JKMP.Plugin.Multiplayer/Steam/SteamManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public static bool InitializeSteam()
{
try
{
SteamClient.Init(1061090, true);
SteamClient.Init(1061090, asyncCallbacks: false);
}
catch (Exception ex)
{
Expand Down
16 changes: 16 additions & 0 deletions JKMP.Plugin.Multiplayer/Threading/Mutex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class Mutex<T>
{
private T value;
private readonly SemaphoreSlim semaphore;

private StackTrace? lastLockTrace;

public Mutex(T value)
{
Expand All @@ -25,13 +27,27 @@ public Mutex(T value)

public async Task<MutexGuard<T>> LockAsync()
{
if (semaphore.CurrentCount == 0)
{
LogManager.TempLogger.Verbose("last: {stackTrace}", lastLockTrace);
LogManager.TempLogger.Verbose("new: {stackTrace}", new StackTrace().ToString());
}

await semaphore.WaitAsync();
lastLockTrace = new();
return new MutexGuard<T>(semaphore, value);
}

public MutexGuard<T> Lock()
{
if (semaphore.CurrentCount == 0)
{
LogManager.TempLogger.Verbose("last: {stackTrace}", lastLockTrace);
LogManager.TempLogger.Verbose("new: {stackTrace}", new StackTrace().ToString());
}

semaphore.Wait();
lastLockTrace = new();
return new MutexGuard<T>(semaphore, value);
}

Expand Down

0 comments on commit f82a250

Please sign in to comment.