Skip to content

Commit

Permalink
Route packets in the same order they were received
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-r-elp committed Dec 30, 2023
1 parent 8e2bec1 commit 1c0f4c0
Showing 1 changed file with 111 additions and 42 deletions.
153 changes: 111 additions & 42 deletions BeatTogether.DedicatedServer.Kernel/PacketSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public PacketSource(
InstanceConfiguration instconfiguration,
LiteNetConfiguration configuration,
LiteNetServer server)
: base (
: base(
configuration,
server)
{
Expand Down Expand Up @@ -105,12 +105,12 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
uint length;
try { length = HandleRead.ReadVarUInt(); _logger.Verbose($"HandleRead read length {length} cast {(int)length} available bytes {HandleRead.RemainingSize}"); }
catch (EndOfBufferException) { _logger.Warning("Packet was an incorrect length"); goto RoutePacket; }

if (length < HandleRead.RemainingSize)
{
_logger.Information($"Packet might be a MultiPacket length {length} remainingbytes {HandleRead.RemainingSize}");
}

if (HandleRead.RemainingSize < length)
{
_logger.Warning($"Packet fragmented (RemainingSize={HandleRead.RemainingSize}, Expected={length}).");
Expand All @@ -124,7 +124,7 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
IPacketRegistry packetRegistry = _packetRegistry;

// Initialize packetId
Queue<(byte? basePacketId, string? mpCorePacketId)> packetId = new();
Queue<(byte? basePacketId, string? mpCorePacketId)> packetIds = new();

while (true)
{
Expand All @@ -136,14 +136,14 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
catch (EndOfBufferException) { _logger.Warning("Packet was an incorrect length"); goto RoutePacket; }
if (packetRegistry.TryCreatePacket(checkPacketId.basePacketId, out packet))
{
packetId.Enqueue(checkPacketId);
packetIds.Enqueue(checkPacketId);
lengthToRoute--;
break;
}
if (packetRegistry.TryGetSubPacketRegistry(checkPacketId.basePacketId, out var subPacketRegistry))
{
packetRegistry = subPacketRegistry;
packetId.Enqueue(checkPacketId);
packetIds.Enqueue(checkPacketId);
lengthToRoute--;
continue;
}
Expand All @@ -156,7 +156,7 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
catch (EndOfBufferException) { _logger.Warning("Packet was an incorrect length"); goto RoutePacket; }
if (MPCoreRegistry.TryCreatePacket(checkPacketId.mpCorePacketId, out packet))
{
packetId.Enqueue(checkPacketId);
packetIds.Enqueue(checkPacketId);
lengthToRoute -= (uint)(HandleRead.Offset - posBeforeStringRead);
break;
}
Expand All @@ -182,7 +182,7 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
int lengthToRead = (int)lengthToRoute;
var bytesToRead = Math.Min(lengthToRead/* - processedBytes*/, HandleRead.RemainingSize);
var bytes = HandleRead.ReadBytes(bytesToRead);
QueueRoutePacket(sender, routingHeader, ref writer, ref legacyWriter, bytes, lengthToRead, packetId);
QueueRoutePacket(sender, routingHeader, ref writer, ref legacyWriter, bytes, lengthToRead, packetIds);
_logger.Verbose(
$"Attempting to Route unknown packet from {sender.ConnectionId} -> {(routingHeader.ReceiverId == AllConnectionIds ? "all players" : routingHeader.ReceiverId)} " +
$"PacketOption='{routingHeader.PacketOption}' " +
Expand All @@ -201,7 +201,7 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
}
continue;
}
if(packet is NoteSpawnPacket || packet is ObstacleSpawnPacket || packet is SliderSpawnPacket) //Note packet logic
if (packet is NoteSpawnPacket || packet is ObstacleSpawnPacket || packet is SliderSpawnPacket) //Note packet logic
{
if (_configuration.DisableNotes || (_playerRegistry.GetPlayerCount() > 16) && !_configuration.ForceEnableNotes)
return;
Expand All @@ -212,7 +212,7 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
{
if ((DateTime.UtcNow.Ticks - sender.TicksAtLastSyncState) / TimeSpan.TicksPerMillisecond < _playerRegistry.GetMillisBetweenSyncStatePackets())
{
_logger.Verbose($"Skipping sync state packet from {sender.ConnectionId} (Secret='{sender.Secret}').");
//_logger.Verbose($"Skipping sync state packet from {sender.ConnectionId} (Secret='{sender.Secret}').");
return;
}
method = DeliveryMethod.Unreliable;
Expand All @@ -222,7 +222,7 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
{
if ((DateTime.UtcNow.Ticks - sender.TicksAtLastSyncStateDelta) / TimeSpan.TicksPerMillisecond < _playerRegistry.GetMillisBetweenSyncStatePackets())
{
_logger.Verbose($"Skipping sync state packet from {sender.ConnectionId} (Secret='{sender.Secret}').");
//_logger.Verbose($"Skipping sync state packet from {sender.ConnectionId} (Secret='{sender.Secret}').");
return;
}
sender.TicksAtLastSyncStateDelta = DateTime.UtcNow.Ticks;
Expand All @@ -235,7 +235,7 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
if (packetHandler is null && packet is not IVersionedNetSerializable)
{
//if (!packetType.Name.StartsWith("NodePoseSyncState"))
_logger.Verbose($"No handler exists for packet of type '{packetType.Name}'.");
_logger.Verbose($"No handler exists for packet of type '{packetType.Name}'.");

// Is packet meant to be routed?
if (routingHeader.ReceiverId != 0)
Expand All @@ -244,7 +244,7 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
int lengthToRead = (int)lengthToRoute;
var bytesToRead = lengthToRead/* - processedBytes*/ /*Math.Min(lengthToRead - processedBytes, HandleRead.RemainingSize)*/;
var bytes = HandleRead.ReadBytes(bytesToRead);
QueueRoutePacket(sender, routingHeader, ref writer, ref legacyWriter, bytes, lengthToRead, packetId);
QueueRoutePacket(sender, routingHeader, ref writer, ref legacyWriter, bytes, lengthToRead, packetIds);
_logger.Verbose(
$"Attempting to Route unhandled packet from {sender.ConnectionId} -> {(routingHeader.ReceiverId == AllConnectionIds ? "all players" : routingHeader.ReceiverId)} " +
$"PacketOption='{routingHeader.PacketOption}' " +
Expand Down Expand Up @@ -284,24 +284,8 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
continue;
}
// Is packet meant to be routed?
(byte SenderId, byte ReceiverId, PacketOption PacketOption) patchedRoutingHeader = routingHeader;
patchedRoutingHeader.SenderId = sender.ConnectionId;
if (routingHeader.ReceiverId == AllConnectionIds)
{
_packetDispatcher.SendExcludingPlayer(sender, versionedPacket, method, patchedRoutingHeader);
}
else if (routingHeader.ReceiverId != 0)
{
if (!_playerRegistry.TryGetPlayer(routingHeader.ReceiverId, out var receiver))
{
_logger.Warning(
"Failed to retrieve receiver " +
$"(SenderIsLegacyPlayer='{senderIsLegacyPlayer}', Secret='{sender.Secret}', ReceiverId={routingHeader.ReceiverId})."
);
return;
}
_packetDispatcher.SendFromPlayerToPlayer(sender, receiver, versionedPacket, method, patchedRoutingHeader);
}
if (routingHeader.ReceiverId != 0)
QueueRoutePacket(sender, routingHeader, ref writer, ref legacyWriter, versionedPacket, (int)length, packetIds);
continue;
}

Expand Down Expand Up @@ -332,7 +316,7 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
}
if (routingHeader.ReceiverId != 0) SendQueue(sender, routingHeader, ref writer, ref legacyWriter, method);
return;
RoutePacket:
RoutePacket:
//Is this packet meant to be routed ?
if (routingHeader.ReceiverId != 0)
{
Expand All @@ -344,9 +328,103 @@ public override void OnReceive(EndPoint remoteEndPoint, ref SpanBuffer reader, D
RoutePacketUnhandled(sender, routingHeader, ref reader, method);
}
}

#region Private Methods

private void QueueRoutePacket(
IPlayer sender, (byte SenderId, byte ReceiverId, PacketOption PacketOption) routingHeader,
ref SpanBuffer writer, ref SpanBuffer legacyWriter, IVersionedNetSerializable packet, int length, Queue<(byte? basePacketId, string? mpCorePacketId)> packetIds)
{
routingHeader.SenderId = sender.ConnectionId;
if (routingHeader.ReceiverId == AllConnectionIds)
{
routingHeader.PacketOption = PacketOption.None;
if (writer.Offset == 0)
{
_logger.Verbose($"Starting Queue for new RoutedPacket");
legacyWriter.WriteLegacyRoutingHeader(routingHeader.SenderId, routingHeader.ReceiverId, routingHeader.PacketOption);
writer.WriteRoutingHeader(routingHeader.SenderId, routingHeader.ReceiverId, routingHeader.PacketOption);
}

writer.WriteVarUInt((uint)length);
legacyWriter.WriteVarUInt((uint)length);
while (packetIds.TryDequeue(out var packetId))
if (packetId.basePacketId.HasValue)
{
writer.WriteUInt8(packetId.basePacketId.Value);
legacyWriter.WriteUInt8(packetId.basePacketId.Value);
}
else if (!string.IsNullOrEmpty(packetId.mpCorePacketId))
{
writer.WriteString(packetId.mpCorePacketId);
legacyWriter.WriteString(packetId.mpCorePacketId);
}
else
throw new ArgumentNullException("PacketId was null");

packet.WriteTo(ref writer, ClientVersions.NewPacketVersion);
packet.WriteTo(ref legacyWriter, ClientVersions.DefaultVersion);

_logger.Verbose(
$"Queueing packet from {routingHeader.SenderId} -> all players " +
$"PacketOption='{routingHeader.PacketOption}' " +
$"(DeliveryMethod={DeliveryMethod.ReliableOrdered})."
);
}
else
{
if (!_playerRegistry.TryGetPlayer(routingHeader.ReceiverId, out var receiver))
{
_logger.Warning(
"QueueRoutePacket: Failed to retrieve receiver " +
$"(Secret='{sender.Secret}', ReceiverId={routingHeader.ReceiverId})."
);
return;
}

if (writer.Offset == 0)
{
_logger.Verbose($"Starting Queue for new RoutedPacket");
if (receiver.ClientVersion < ClientVersions.NewPacketVersion)
legacyWriter.WriteLegacyRoutingHeader(routingHeader.SenderId, routingHeader.ReceiverId, routingHeader.PacketOption);
else
writer.WriteRoutingHeader(routingHeader.SenderId, routingHeader.ReceiverId, routingHeader.PacketOption);
}

_logger.Verbose(
$"Queueing packet from {routingHeader.SenderId} -> {routingHeader.ReceiverId} " +
$"PacketOption='{routingHeader.PacketOption}' " +
$"(Secret='{receiver.Secret}', DeliveryMethod={DeliveryMethod.ReliableOrdered}).");

if (receiver.ClientVersion < ClientVersions.NewPacketVersion)
legacyWriter.WriteVarUInt((uint)length);
else
writer.WriteVarUInt((uint)length);
while (packetIds.TryDequeue(out var packetId))
if (packetId.basePacketId.HasValue)
{
if (receiver.ClientVersion < ClientVersions.NewPacketVersion)
legacyWriter.WriteUInt8(packetId.basePacketId.Value);
else
writer.WriteUInt8(packetId.basePacketId.Value);
}
else if (!string.IsNullOrEmpty(packetId.mpCorePacketId))
{
if (receiver.ClientVersion < ClientVersions.NewPacketVersion)
legacyWriter.WriteString(packetId.mpCorePacketId);
else
writer.WriteString(packetId.mpCorePacketId);
}
else
throw new ArgumentNullException("PacketId was null");

if (receiver.ClientVersion < ClientVersions.NewPacketVersion)
packet.WriteTo(ref legacyWriter, receiver.ClientVersion);
else
packet.WriteTo(ref writer, receiver.ClientVersion);
}
}

private void QueueRoutePacket(
IPlayer sender, (byte SenderId, byte ReceiverId, PacketOption PacketOption) routingHeader,
ref SpanBuffer writer, ref SpanBuffer legacyWriter, Span<byte> data, int length, Queue<(byte? basePacketId, string? mpCorePacketId)> packetIds)
Expand Down Expand Up @@ -414,8 +492,6 @@ private void QueueRoutePacket(
if (legacyWriter.Offset > 0) legacyWriter.WriteBytes(data);
if (legacyWriter.Offset == 0 && writer.Offset == 0)
_logger.Verbose($"No packets to send");


}

private void SendQueue(IPlayer sender, (byte SenderId, byte ReceiverId, PacketOption PacketOption) routingHeader, ref SpanBuffer writer, ref SpanBuffer legacyWriter, DeliveryMethod deliveryMethod)
Expand Down Expand Up @@ -468,13 +544,6 @@ private void SendQueue(IPlayer sender, (byte SenderId, byte ReceiverId, PacketOp
legacyWriter.Dispose();
}

//private void RoutePacketUnhandled(IPlayer sender,
// (byte SenderId, byte ReceiverId, PacketOption PacketOption) routingHeader,
// ref SpanBuffer writer, DeliveryMethod deliveryMethod, (byte? basePacketId, string? mpCorePacketId) packetId)
//{

//}

private void RoutePacketUnhandled(IPlayer sender,
(byte SenderId, byte ReceiverId, PacketOption PacketOption) routingHeader,
ref SpanBuffer reader, DeliveryMethod deliveryMethod)
Expand Down

0 comments on commit 1c0f4c0

Please sign in to comment.