diff --git a/src/rudp/partials/RUDP.ClientTo.cs b/src/rudp/partials/RUDP.ClientTo.cs index cdfd745..d195f5c 100644 --- a/src/rudp/partials/RUDP.ClientTo.cs +++ b/src/rudp/partials/RUDP.ClientTo.cs @@ -22,9 +22,12 @@ private class ClientTo : IRUDP.ClientTo private Socket _socket; private bool _isOpeningOrClosing, _isClosed; private readonly bool _isServer; - private int _openTimeout; + private int _openTimeout, _resendDelay; private MessageId _messageId; private readonly object _nextIdLock = new object(); + private readonly object _queueLock = new object(); + private readonly List<(uint id, byte[] data)> _reliableQueueList; + private readonly List<(uint id, byte[] data)> _reliableOrderedQueueList; private struct Config { @@ -58,7 +61,10 @@ private ClientTo() _socket = null; _client = null; _isServer = false; + _resendDelay = 10; _openTimeout = 3000; + _reliableQueueList = new List<(uint id, byte[] data)>(); + _reliableOrderedQueueList = new List<(uint id, byte[] data)>(); Host = Host.Default; } @@ -143,10 +149,15 @@ ref endpoint ); } + InitReceiver(); + // connected successful Host = new Host(_socket.RemoteEndPoint); - _messageId = new MessageId(); + lock (_nextIdLock) + { + _messageId = new MessageId(); + } _isClosed = false; @@ -262,15 +273,32 @@ private void Send(byte[] bytes, MessageType messageType) private void Send(Host host, ref byte[] bytes, MessageType messageType) { + var packageId = GetNextId(messageType); var package = new Package { Type = (sbyte)messageType, Data = bytes, - Id = GetNextId(messageType) + Id = packageId }; GetBufferFromPackage(ref package, out var buffer); + if (buffer.Length <= 0) return; + + switch (messageType) + { + case MessageType.Reliable: + { + _reliableQueueList.Add((packageId, buffer)); + break; + } + case MessageType.ReliableUnordered: + { + _reliableOrderedQueueList.Add((packageId, buffer)); + break; + } + } + SendRaw(host, ref buffer); } @@ -337,11 +365,33 @@ public void SetOpenTimeout(int value) _openTimeout = value; } + public void SetResendDelay(int value) + { + if (IsOpened) + throw new Exception + ( + $"Isn't possible use `{nameof(SetResendDelay)}` while socket is already connected." + ); + + if (value <= 0) + throw new Exception + ( + $"Isn't possible use {nameof(SetResendDelay)} with a `0` or `negative` value" + ); + + _resendDelay = value; + } + public int GetOpenTimeout() { return _openTimeout; } + public int GetResendDelay() + { + return _resendDelay; + } + private uint GetNextId(MessageType messageType) { lock (_nextIdLock) @@ -385,6 +435,200 @@ private void GetBufferFromPackage(ref Package package, out byte[] buffer) primitive.Reset(); } + + private void PushResult(ref byte[] bytes, MessageType messageType) + { + (string name, byte[] buffer) content = NetlyEnvironment.EventManager.Verify(bytes); + + if (content.buffer == null) + On.OnData?.Invoke(null, (bytes, messageType)); + else + On.OnEvent?.Invoke(null, (content.name, content.buffer, messageType)); + } + + private void InitReceiver() + { + var endpoint = Host.EndPoint; + + var buffer = new byte + [ + // Maximum/Default receive buffer length. + (int)_socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer) + ]; + + ReceiverUpdate(); + + Task.Run(() => + { + while (IsOpened) + { + ReliableQueueTask(); + Thread.Sleep(_resendDelay); + } + }); + + void ReceiverUpdate() + { + if (!IsOpened) + { + Close(); + return; + } + + _socket.BeginReceiveFrom + ( + buffer, + 0, + buffer.Length, + SocketFlags.None, + ref endpoint, + ReceiveCallback, + null + ); + } + + void ReceiveCallback(IAsyncResult result) + { + try + { + var size = _socket.EndReceiveFrom(result, ref endpoint); + + if (size <= 0) + { + if (IsOpened) + ReceiverUpdate(); + else + Close(); + return; + } + + byte[] data = null; + + if (size == 1) + { + // optimize 1 byte (ping, connection ack) + data = new[] { buffer[0] }; + } + else if (size == 5) + { + // optimize 5 bytes (reliable ack) + data = new[] { buffer[0], buffer[1], buffer[2], buffer[3], buffer[4] }; + } + else + { + // read any data + data = new byte[size]; + Array.Copy(buffer, 0, data, 0, data.Length); + } + + ProcessBuffer(ref data); + + ReceiverUpdate(); + } + catch (Exception e) + { + NetlyEnvironment.Logger.Create(e); + Close(); + } + } + } + + private void ProcessBuffer(ref byte[] data) + { + if (data.Length == 1) + { + return; + } + + Primitive primitive = new Primitive(data); + var package = primitive.Get.Class(); + + // invalid package + if (!primitive.IsValid) + { + primitive.Reset(); + return; + } + + MessageType messageType = (MessageType)package.Type; + + switch (messageType) + { + case MessageType.Unreliable: + { + var bytes = package.Data; + PushResult(ref bytes, MessageType.Unreliable); + break; + } + case MessageType.UnreliableOrdered: + { + // TODO: + break; + } + case MessageType.Reliable: + { + // TODO: + break; + } + case MessageType.ReliableUnordered: + { + // TODO: + break; + } + default: + { + throw new Exception($"{messageType} isn't supported."); + } + } + } + + private void ReliableQueueTask() + { + try + { + if (_reliableQueueList.Count > 0) + { + (uint id, byte[] data)[] array; + + lock (_queueLock) + { + array = _reliableQueueList.ToArray(); + } + + if (array.Length > 0) + { + foreach (var value in array) + { + var package = value; + SendRaw(Host, ref package.data); + } + } + } + + if (_reliableOrderedQueueList.Count > 0) + { + (uint id, byte[] data)[] array; + + lock (_queueLock) + { + array = _reliableOrderedQueueList.ToArray(); + } + + if (array.Length > 0) + { + foreach (var value in array) + { + var package = value; + SendRaw(Host, ref package.data); + } + } + } + } + catch (Exception e) + { + NetlyEnvironment.Logger.Create(e); + } + } } } }