Skip to content

Commit

Permalink
Fix Unity transport sending corrupted data
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrakena committed Dec 12, 2023
1 parent 5b63b1e commit 2ac121b
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 46 deletions.
6 changes: 2 additions & 4 deletions Common/Networking/Channels/ReliableChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private async ValueTask ProcessAckAsync(NetworkPacket packet)
_pendingPackets[pendingIdx] = p;
if (r)
{
Peer.LogDebug($"[Packet {packet.Sequence}] Correct ack for {pendingSeq}");
//Peer.LogDebug($"[Packet {packet.Sequence}] Correct ack for {pendingSeq}");
}
}
}
Expand Down Expand Up @@ -184,8 +184,6 @@ protected override async Task<bool> FlushQueueAsync()
var p = _pendingPackets[pendingSeq % _windowSize];
var sendSeq = await p.TrySendAsync(currentTime, Peer);
_pendingPackets[pendingSeq % _windowSize] = p;
//var sendSeq = await _pendingPackets[pendingSeq % _windowSize].TrySendAsync(currentTime, Peer);
Peer.LogDebug("Trying to send sequence number " + pendingSeq + ": " + sendSeq);
if (sendSeq)
{
hasPendingPackets = true;
Expand Down Expand Up @@ -295,7 +293,7 @@ public override async ValueTask<bool> HandlePacketAsync(NetworkPacket packet)
//detailed check
if (seq == _remoteSequence)
{
Peer.LogDebug($"[Receive] {packet.Property} ({packet.Data.Count} bytes) (sequence {packet.Sequence})");
//Peer.LogDebug($"[Receive] {packet.Property} ({packet.Data.Count} bytes) (sequence {packet.Sequence})");
await Peer.AddReliablePacket(_deliveryMethod, packet);
_remoteSequence = (_remoteSequence + 1) % NetConstants.MaxSequence;

Expand Down
1 change: 1 addition & 0 deletions Common/Networking/Core/PromulManager.Socket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ internal async ValueTask<int> RawSendAsync(ArraySegment<byte> data, IPEndPoint r
{
result = await socket.SendToAsync(data, SocketFlags.None, remoteEndPoint);
}
catch (NullReferenceException){}
catch (SocketException ex)
{
switch (ex.SocketErrorCode)
Expand Down
2 changes: 1 addition & 1 deletion Common/Networking/Peers/PeerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ internal async Task SendUserData(NetworkPacket packet)
//if (mergedPacketSize + splitThreshold >= MaximumTransferUnit)
//{
await PromulManager.RawSendAsync(packet, EndPoint);
LogDebug($"[Send] {packet.Property} ({packet.Data.Count} bytes) (sequence {packet.Sequence})");
//LogDebug($"[Send] {packet.Property} ({packet.Data.Count} bytes) (sequence {packet.Sequence})");
//}
// if (_mergePos + mergedPacketSize > MaximumTransferUnit) await SendMerged();
//
Expand Down
8 changes: 3 additions & 5 deletions Runtime/PromulTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ public async Task SendControl(RelayControlMessage rcm, NetworkDelivery qos)

public override void Send(ulong clientId, ArraySegment<byte> data, NetworkDelivery qos)
{
Debug.Log("Sending to " + clientId + ": " + string.Join(" ", data.Select(e => e.ToString("X2"))));
byte[] sd = data.ToArray();
Task.Run(async () =>
{
await SendControl(new RelayControlMessage
{
Type = RelayControlMessageType.Data,
AuthorClientId = clientId,
Data = data
Data = sd
}, qos);
});
}
Expand All @@ -93,7 +93,6 @@ async ValueTask OnNetworkReceive(PeerBase peer, CompositeReader reader, byte cha
{
var message = reader.ReadRelayControlMessage();
var author = message.AuthorClientId;
Debug.Log($"Receive from {peer.Id} author={author} type={message.Type:G}");
switch (message.Type)
{
// Either we are host and a client has connected,
Expand All @@ -112,10 +111,9 @@ async ValueTask OnNetworkReceive(PeerBase peer, CompositeReader reader, byte cha
// Relayed data
case RelayControlMessageType.Data:
{
Debug.Log("Data: " + string.Join(" ", message.Data.Select(e => e.ToString("X2"))));
var data = new byte[message.Data.Count];
message.Data.CopyTo(data);
_queue.Enqueue((NetworkEvent.Data, author, data));
_queue.Enqueue((NetworkEvent.Data, author, message.Data));
break;
}
case RelayControlMessageType.KickFromRelay:
Expand Down
62 changes: 26 additions & 36 deletions Server~/Relay/Sessions/RelaySession.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
using Promul.Common.Networking;
using Promul.Common.Networking.Data;
using Promul.Common.Structs;

namespace Promul.Server.Relay.Sessions;

public class RelaySession
{
private readonly Dictionary<int, PeerBase> _connections = new();
readonly Dictionary<int, PeerBase> _connections = new Dictionary<int, PeerBase>();
int? host = null;
public PeerBase? HostPeer => host != null ? _connections[host.Value] : null;
public string JoinCode { get; }

private readonly ILogger<RelaySession> _logger;
private readonly RelayServer _server;
private int? host;
readonly ILogger<RelaySession> _logger;
readonly RelayServer _server;

public RelaySession(string joinCode, RelayServer server, ILogger<RelaySession> logger)
{
Expand All @@ -19,27 +20,20 @@ public RelaySession(string joinCode, RelayServer server, ILogger<RelaySession> l
_server = server;
}

public PeerBase? HostPeer => host != null ? _connections[host.Value] : null;
public string JoinCode { get; }

public IEnumerable<PeerBase> Peers => _connections.Values;

public async Task OnReceive(PeerBase from, RelayControlMessage message, DeliveryMethod method)
{
if (!_connections.TryGetValue((int)message.AuthorClientId, out var dest))
if (!_connections.TryGetValue((int) message.AuthorClientId, out var dest))
{
LogInformation(
$"{from.Id} tried to send information to {message.AuthorClientId}, but {message.AuthorClientId} is not connected to the relay.");
LogInformation($"{from.Id} tried to send information to {message.AuthorClientId}, but {message.AuthorClientId} is not connected to the relay.");
return;
}

switch (message.Type)
{
case RelayControlMessageType.Data:
await SendAsync(dest,
new RelayControlMessage
{ Type = RelayControlMessageType.Data, AuthorClientId = (ulong)from.Id, Data = message.Data },
method);
await SendAsync(dest, new RelayControlMessage { Type = RelayControlMessageType.Data, AuthorClientId = (ulong)from.Id, Data = message.Data }, method);
break;
case RelayControlMessageType.KickFromRelay:
var target = message.AuthorClientId;
Expand All @@ -52,27 +46,22 @@ await SendAsync(dest,
LogInformation($"Host {from.Id} successfully kicked {target}");
}
}
else
{
LogInformation($"Client {from.Id} tried to illegally kick {target}!");
}

else LogInformation($"Client {from.Id} tried to illegally kick {target}!");
break;
case RelayControlMessageType.Connected:
case RelayControlMessageType.Disconnected:
default:
LogInformation($"Ignoring invalid message {message.Type:G} from {from.Id}");
break;
}

}

private async Task SendAsync(PeerBase to, RelayControlMessage message, DeliveryMethod method)
{
var writer = CompositeWriter.Create();
writer.Write(message);
_logger.LogInformation(
$"Sending {message.Type} ({message.Data.Count} bytes) from {message.AuthorClientId} to {to.Id}");
await to.SendAsync(writer, method);
await to.SendAsync(writer, deliveryMethod: method);
}

public async Task OnJoinAsync(PeerBase peer)
Expand All @@ -86,18 +75,18 @@ public async Task OnJoinAsync(PeerBase peer)
else
{
LogInformation($"{peer.Id} has joined");

await SendAsync(HostPeer!, new RelayControlMessage
await SendAsync(HostPeer!, new RelayControlMessage()
{
Type = RelayControlMessageType.Connected,
AuthorClientId = (ulong)peer.Id,
AuthorClientId = (ulong) peer.Id,
Data = Array.Empty<byte>()
}, DeliveryMethod.ReliableOrdered);

await SendAsync(peer, new RelayControlMessage
{
Type = RelayControlMessageType.Connected,
AuthorClientId = (ulong)host!,
AuthorClientId = (ulong) host!,
Data = Array.Empty<byte>()
}, DeliveryMethod.ReliableOrdered);
}
Expand All @@ -116,30 +105,31 @@ public async Task OnLeave(PeerBase peer)
await _server.DestroySession(this);
return;
}

if (host != null)
{
await SendAsync(HostPeer!, new RelayControlMessage
{
Type = RelayControlMessageType.Disconnected,
AuthorClientId = (ulong)peer.Id,
AuthorClientId = (ulong) peer.Id,
Data = Array.Empty<byte>()
}, DeliveryMethod.ReliableOrdered);
}
}
}

public async Task DisconnectAll()
{
foreach (var con in _connections.Values) await _server.PromulManager.DisconnectPeerAsync(con);
foreach (var con in _connections.Values)
{
await this._server.PromulManager.DisconnectPeerAsync(con);
}
_connections.Clear();
}

private void LogInformation(string message)
{
_logger.LogInformation("[{}] {}", this, message);
}

public override string ToString()
{
return $"Session {JoinCode}";
}
public override string ToString() => $"Session {this.JoinCode}";
}

0 comments on commit 2ac121b

Please sign in to comment.