Skip to content

Commit

Permalink
Merge pull request #15 from Jump-King-Multiplayer/development
Browse files Browse the repository at this point in the history
Merge development into main
  • Loading branch information
Skippeh authored Jan 16, 2022
2 parents 708e845 + f82a250 commit 4179a88
Show file tree
Hide file tree
Showing 23 changed files with 251 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,7 @@ protected override void LateUpdate(float delta)
private void SendState()
{
if (body == null || listener == null)
throw new InvalidOperationException("SendTranmission was called before component was initialized");

// Only send message if the connected players mutex isn't locked (prevents potential lag spikes)
// Note that there's still a chance it will be locked after this check and before the broadcast happens, but it should (hopefully) lower lag spikes significantly
if (p2p.ConnectedPlayersMtx.IsLocked)
return;
throw new InvalidOperationException("SendState was called before component was initialized");

var surfaceType = listener.CurrentSurfaceType;
bool wearingShoes = SkinManager.IsWearingSkin(Items.Shoes);
Expand Down
4 changes: 4 additions & 0 deletions JKMP.Plugin.Multiplayer/Game/Entities/GameEntity.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class GameEntity : BaseManagerEntity

private LocalPlayerListener plrListener = null!;
private Chat chatWidget = null!;
private StatusPanel statusPanel = null!;

private float timeSincePositionUpdate;
private const float PositionUpdateInterval = 30; // Send a position update every 30 seconds
Expand All @@ -43,6 +44,7 @@ protected override void OnFirstUpdate()
chatWidget = UIManager.AddWidget(new Chat());
localPlayer = EntityManager.instance.Find<PlayerEntity>();
localPlayer.AddComponents(new PlayerStateTransmitter(P2P), new AudioListenerComponent());
statusPanel = UIManager.AddWidget(new StatusPanel());

MatchmakingManager.Instance.Events.NearbyClientsReceived += OnNearbyClientsReceived;
}
Expand All @@ -55,6 +57,7 @@ protected override void OnDestroy()
plrListener.Dispose();
P2P.Dispose();
UIManager.RemoveWidget(chatWidget);
UIManager.RemoveWidget(statusPanel);
}

private void OnNearbyClientsReceived(ICollection<ulong> steamIds)
Expand All @@ -78,6 +81,7 @@ protected override void Update(float delta)
P2P.Update(delta);
Sound.Update(delta);
chatWidget.Update(delta);
statusPanel.Update(delta);
}
}
}
70 changes: 70 additions & 0 deletions JKMP.Plugin.Multiplayer/Game/UI/Widgets/StatusPanel.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using JKMP.Plugin.Multiplayer.Matchmaking;
using JKMP.Plugin.Multiplayer.Networking;
using JumpKing.PauseMenu;
using Matchmaking.Client.EventData;
using Myra.Graphics2D.UI;

namespace JKMP.Plugin.Multiplayer.Game.UI.Widgets
{
public class StatusPanel : ResourceWidget<StatusPanel>
{
public string TotalPlayers
{
get => totalPlayers.Text;
set => totalPlayers.Text = value;
}

public string GroupPlayers
{
get => groupPlayers.Text;
set => groupPlayers.Text = value;
}

public bool Connected
{
get => connected;
set
{
if (value == connected)
return;

connected = value;

connectedContainer.Visible = connected;
disconnectedContainer.Visible = !connected;
}
}

private readonly Label totalPlayers;
private readonly Label groupPlayers;
private readonly Widget connectedContainer;
private readonly Widget disconnectedContainer;
private bool connected;

public StatusPanel() : base("UI/Status/StatusPanel.xmmp")
{
totalPlayers = EnsureWidgetById<Label>("TotalPlayers");
groupPlayers = EnsureWidgetById<Label>("GroupPlayers");
connectedContainer = EnsureWidgetById<Widget>("ConnectedContainer");
disconnectedContainer = EnsureWidgetById<Widget>("DisconnectedContainer");

connectedContainer.Visible = false;

AcceptsKeyboardFocus = false;

MatchmakingManager.Instance.Events.ServerStatusUpdateReceived += OnServerStatusReceived;
}

private void OnServerStatusReceived(ServerStatus status)
{
TotalPlayers = status.TotalPlayers.ToString();
GroupPlayers = status.GroupPlayers.ToString();
}

public void Update(float delta)
{
Visible = PauseManager.instance.IsPaused;
Connected = MatchmakingManager.Instance.IsConnected;
}
}
}
1 change: 1 addition & 0 deletions JKMP.Plugin.Multiplayer/MultiplayerPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public override void Initialize()
GameEvents.GameUpdate += gameTime =>
{
SteamClient.RunCallbacks();

InputManager.Update(gameTime);
};

Expand Down
25 changes: 19 additions & 6 deletions JKMP.Plugin.Multiplayer/Networking/Framed.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
Expand All @@ -16,7 +17,8 @@ namespace JKMP.Plugin.Multiplayer.Networking
{
private bool isRunning = true;

private readonly ReusableTCS<TData?> tcs = new();
private TaskCompletionSource<bool>? tcs = new();
private readonly Queue<TData> queuedMessages = new();
private readonly TCodec codec;
private readonly byte[] sendBuffer = new byte[4096];

Expand Down Expand Up @@ -54,8 +56,9 @@ private async Task StartPolling()
Logger.Error(ex, "An unhandled exception was raised when decoding message from {steamId}", packet.SteamId);
continue;
}

await tcs.SetResult(message);

queuedMessages.Enqueue(message);
tcs?.TrySetResult(true);
}

await Task.Delay(33).ConfigureAwait(false); // poll around 30 times per second
Expand All @@ -73,9 +76,19 @@ private async Task StartPolling()
if (!isRunning)
return default;

await tcs;
TData? result = tcs.GetResult();
tcs.Reset();
if (tcs != null)
{
await tcs.Task;
tcs = null;
}

TData result = queuedMessages.Dequeue();

if (queuedMessages.Count == 0)
{
tcs = new();
}

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ internal class DisconnectedMessageHandler : IMessageHandler<Disconnected, Contex
{
public Task HandleMessage(Disconnected message, Context context)
{
context.P2PManager.Disconnect(message.Sender);
return Task.CompletedTask;
return context.P2PManager.ExecuteOnGameThread(() =>
{
context.P2PManager.Disconnect(message.Sender);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ internal class HandshakeRequestHandler : IMessageHandler<HandshakeRequest, Conte
{
private static readonly ILogger Logger = LogManager.CreateLogger<HandshakeRequestHandler>();

public Task HandleMessage(HandshakeRequest message, Context context)
public async Task HandleMessage(HandshakeRequest message, Context context)
{
var authResult = SteamUser.BeginAuthSession(message.AuthSessionTicket, message.Sender);

Expand All @@ -26,25 +26,31 @@ public Task HandleMessage(HandshakeRequest message, Context context)
ErrorMessage = $"Steam auth result = {authResult}"
});

context.P2PManager.Disconnect(message.Sender);
return Task.CompletedTask;
await context.P2PManager.ExecuteOnGameThread(() =>
{
context.P2PManager.Disconnect(message.Sender);
});
return;
}

var plrListener = EntityManager.instance.Find<GameEntity>().PlayerListener;
var playerState = new PlayerStateChanged
{
Position = plrListener.Position,
State = plrListener.CurrentState,
WalkDirection = (sbyte)plrListener.WalkDirection
};
PlayerStateChanged? playerState = null;

await context.P2PManager.ExecuteOnGameThread(() =>
{
var plrListener = EntityManager.instance.Find<GameEntity>().PlayerListener;
playerState = new PlayerStateChanged
{
Position = plrListener.Position,
State = plrListener.CurrentState,
WalkDirection = (sbyte)plrListener.WalkDirection
};
});

context.Messages.Send(message.Sender, new HandshakeResponse
{
Success = true,
PlayerState = playerState
});

return Task.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ public async Task HandleMessage(HandshakeResponse message, Context context)
throw new NotImplementedException("User info is null, this shouldn't happen");

using var guard = await context.P2PManager.ConnectedPlayersMtx.LockAsync();
RemotePlayer player = guard.Value[message.Sender];
player.InitializeFromHandshakeResponse(message, userInfo.Value);

await context.P2PManager.ExecuteOnGameThread(() =>
{
RemotePlayer player = guard.Value[message.Sender];
player.InitializeFromHandshakeResponse(message, userInfo.Value);
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ public async Task HandleMessage(LocalChatMessage message, Context context)

Logger.Information("[{channel}, {senderId}] {senderName}: {message}", ChatChannel.Local, message.Sender, senderName, trimmedMessage);

context.P2PManager.Events.OnIncomingChatMessage(new ChatMessage(ChatChannel.Local, message.Sender.Value, senderName, trimmedMessage));
await context.P2PManager.ExecuteOnGameThread(() =>
{
context.P2PManager.Events.OnIncomingChatMessage(new ChatMessage(ChatChannel.Local, message.Sender.Value, senderName, trimmedMessage));
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@ internal class PlayerStateChangedHandler : IMessageHandler<PlayerStateChanged, C

public async Task HandleMessage(PlayerStateChanged message, Context context)
{
using var guard = await context.P2PManager.ConnectedPlayersMtx.LockAsync();
await context.P2PManager.ExecuteOnGameThread(() =>
{
using var guard = context.P2PManager.ConnectedPlayersMtx.Lock();

if (!guard.Value.TryGetValue(message.Sender, out var player))
return;
if (!guard.Value.TryGetValue(message.Sender, out var player))
return;

if (player.State != PlayerNetworkState.Connected)
return;

player.UpdateFromState(message);
if (player.State != PlayerNetworkState.Connected)
return;

player.UpdateFromState(message);
});
}
}
}
18 changes: 10 additions & 8 deletions JKMP.Plugin.Multiplayer/Networking/P2PManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class P2PManager : IDisposable

private static readonly ILogger Logger = LogManager.CreateLogger<P2PManager>();

private readonly Queue<Action> pendingGameThreadActions = new();
private readonly Queue<(Action action, TaskCompletionSource<bool> tcs)> pendingGameThreadActions = new();

private readonly HashSet<(DateTime, SteamId)> recentlyDisconnectedPeers = new();

Expand Down Expand Up @@ -177,7 +177,7 @@ private async Task ProcessMessages()
if (message is not PlayerStateChanged)
Logger.Verbose("Incoming message {message}", message.GetType().Name);

processor.PushMessage(message, context);
await processor.HandleMessage(message, context);
}

Logger.Verbose("Finished message processing");
Expand All @@ -190,19 +190,23 @@ private async Task ProcessMessages()
}

/// <summary>
/// Executes the action on the game thread on the next update.
/// Executes the action on the game thread on the next update and waits for it to be executed.
/// </summary>
public void ExecuteOnGameThread(Action action)
public async Task ExecuteOnGameThread(Action action)
{
if (action == null) throw new ArgumentNullException(nameof(action));
pendingGameThreadActions.Enqueue(action);
var tcs = new TaskCompletionSource<bool>(false);
pendingGameThreadActions.Enqueue((action, tcs));
await tcs.Task;
}

public void Update(float delta)
{
while (pendingGameThreadActions.Count > 0)
{
pendingGameThreadActions.Dequeue()();
(Action action, TaskCompletionSource<bool> tcs) = pendingGameThreadActions.Dequeue();
action();
tcs.SetResult(true);
}

if (recentlyDisconnectedPeers.Count > 0)
Expand All @@ -220,8 +224,6 @@ public void Update(float delta)

recentlyDisconnectedPeers.RemoveWhere(item => toRemove.Contains(item));
}

processor.HandlePendingMessages().Wait();
}

internal void Broadcast(GameMessage message, P2PSend sendType = P2PSend.Reliable) => BroadcastAsync(message, sendType).Wait();
Expand Down
Loading

0 comments on commit 4179a88

Please sign in to comment.