Skip to content

Commit

Permalink
rudp deep impl. part 1
Browse files Browse the repository at this point in the history
  • Loading branch information
alec1o committed Jul 2, 2024
1 parent a5a6fdc commit cf11ec1
Showing 1 changed file with 247 additions and 3 deletions.
250 changes: 247 additions & 3 deletions src/rudp/partials/RUDP.ClientTo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@ private class ClientTo : IRUDP.ClientTo
private Socket _socket;
private bool _isOpeningOrClosing, _isClosed;
private readonly bool _isServer;
private int _openTimeout;
private int _openTimeout, _resendDelay;
private MessageId _messageId;
private readonly object _nextIdLock = new object();
private readonly object _queueLock = new object();
private readonly List<(uint id, byte[] data)> _reliableQueueList;
private readonly List<(uint id, byte[] data)> _reliableOrderedQueueList;

private struct Config
{
Expand Down Expand Up @@ -58,7 +61,10 @@ private ClientTo()
_socket = null;
_client = null;
_isServer = false;
_resendDelay = 10;
_openTimeout = 3000;
_reliableQueueList = new List<(uint id, byte[] data)>();
_reliableOrderedQueueList = new List<(uint id, byte[] data)>();
Host = Host.Default;
}

Expand Down Expand Up @@ -143,10 +149,15 @@ ref endpoint
);
}

InitReceiver();

// connected successful
Host = new Host(_socket.RemoteEndPoint);

_messageId = new MessageId();
lock (_nextIdLock)
{
_messageId = new MessageId();
}

_isClosed = false;

Expand Down Expand Up @@ -262,15 +273,32 @@ private void Send(byte[] bytes, MessageType messageType)

private void Send(Host host, ref byte[] bytes, MessageType messageType)
{
var packageId = GetNextId(messageType);
var package = new Package
{
Type = (sbyte)messageType,
Data = bytes,
Id = GetNextId(messageType)
Id = packageId
};

GetBufferFromPackage(ref package, out var buffer);

if (buffer.Length <= 0) return;

switch (messageType)
{
case MessageType.Reliable:
{
_reliableQueueList.Add((packageId, buffer));
break;
}
case MessageType.ReliableUnordered:
{
_reliableOrderedQueueList.Add((packageId, buffer));
break;
}
}

SendRaw(host, ref buffer);
}

Expand Down Expand Up @@ -337,11 +365,33 @@ public void SetOpenTimeout(int value)
_openTimeout = value;
}

public void SetResendDelay(int value)
{
if (IsOpened)
throw new Exception
(
$"Isn't possible use `{nameof(SetResendDelay)}` while socket is already connected."
);

if (value <= 0)
throw new Exception
(
$"Isn't possible use {nameof(SetResendDelay)} with a `0` or `negative` value"
);

_resendDelay = value;
}

public int GetOpenTimeout()
{
return _openTimeout;
}

public int GetResendDelay()
{
return _resendDelay;
}

private uint GetNextId(MessageType messageType)
{
lock (_nextIdLock)
Expand Down Expand Up @@ -385,6 +435,200 @@ private void GetBufferFromPackage(ref Package package, out byte[] buffer)

primitive.Reset();
}

private void PushResult(ref byte[] bytes, MessageType messageType)
{
(string name, byte[] buffer) content = NetlyEnvironment.EventManager.Verify(bytes);

if (content.buffer == null)
On.OnData?.Invoke(null, (bytes, messageType));
else
On.OnEvent?.Invoke(null, (content.name, content.buffer, messageType));
}

private void InitReceiver()
{
var endpoint = Host.EndPoint;

var buffer = new byte
[
// Maximum/Default receive buffer length.
(int)_socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer)
];

ReceiverUpdate();

Task.Run(() =>
{
while (IsOpened)
{
ReliableQueueTask();
Thread.Sleep(_resendDelay);
}
});

void ReceiverUpdate()
{
if (!IsOpened)
{
Close();
return;
}

_socket.BeginReceiveFrom
(
buffer,
0,
buffer.Length,
SocketFlags.None,
ref endpoint,
ReceiveCallback,
null
);
}

void ReceiveCallback(IAsyncResult result)
{
try
{
var size = _socket.EndReceiveFrom(result, ref endpoint);

if (size <= 0)
{
if (IsOpened)
ReceiverUpdate();
else
Close();
return;
}

byte[] data = null;

if (size == 1)
{
// optimize 1 byte (ping, connection ack)
data = new[] { buffer[0] };
}
else if (size == 5)
{
// optimize 5 bytes (reliable ack)
data = new[] { buffer[0], buffer[1], buffer[2], buffer[3], buffer[4] };
}
else
{
// read any data
data = new byte[size];
Array.Copy(buffer, 0, data, 0, data.Length);
}

ProcessBuffer(ref data);

ReceiverUpdate();
}
catch (Exception e)
{
NetlyEnvironment.Logger.Create(e);
Close();
}
}
}

private void ProcessBuffer(ref byte[] data)
{
if (data.Length == 1)
{
return;
}

Primitive primitive = new Primitive(data);
var package = primitive.Get.Class<Package>();

// invalid package
if (!primitive.IsValid)
{
primitive.Reset();
return;
}

MessageType messageType = (MessageType)package.Type;

switch (messageType)
{
case MessageType.Unreliable:
{
var bytes = package.Data;
PushResult(ref bytes, MessageType.Unreliable);
break;
}
case MessageType.UnreliableOrdered:
{
// TODO:
break;
}
case MessageType.Reliable:
{
// TODO:
break;
}
case MessageType.ReliableUnordered:
{
// TODO:
break;
}
default:
{
throw new Exception($"{messageType} isn't supported.");
}
}
}

private void ReliableQueueTask()
{
try
{
if (_reliableQueueList.Count > 0)
{
(uint id, byte[] data)[] array;

lock (_queueLock)
{
array = _reliableQueueList.ToArray();
}

if (array.Length > 0)
{
foreach (var value in array)
{
var package = value;
SendRaw(Host, ref package.data);
}
}
}

if (_reliableOrderedQueueList.Count > 0)
{
(uint id, byte[] data)[] array;

lock (_queueLock)
{
array = _reliableOrderedQueueList.ToArray();
}

if (array.Length > 0)
{
foreach (var value in array)
{
var package = value;
SendRaw(Host, ref package.data);
}
}
}
}
catch (Exception e)
{
NetlyEnvironment.Logger.Create(e);
}
}
}
}
}
Expand Down

0 comments on commit cf11ec1

Please sign in to comment.