From f915f305912e4060c466f0bb58b386130bf36625 Mon Sep 17 00:00:00 2001 From: "Alecio Furanze (Ale)" Date: Wed, 3 Jul 2024 16:58:39 +0200 Subject: [PATCH] reimpl. RUDP.Server --- src/rudp/partials/RUDP.Server.cs | 2 +- src/rudp/partials/RUDP.ServerOn.cs | 53 +++--- src/rudp/partials/RUDP.ServerTo.cs | 250 +++++++++++++++++++++++++++-- 3 files changed, 265 insertions(+), 40 deletions(-) diff --git a/src/rudp/partials/RUDP.Server.cs b/src/rudp/partials/RUDP.Server.cs index 0a7a8683..08ef3904 100644 --- a/src/rudp/partials/RUDP.Server.cs +++ b/src/rudp/partials/RUDP.Server.cs @@ -8,7 +8,7 @@ public static partial class RUDP { public partial class Server : IRUDP.Server { - private readonly ServerOn _on; + internal readonly ServerOn _on; private readonly ServerTo _to; public Server() diff --git a/src/rudp/partials/RUDP.ServerOn.cs b/src/rudp/partials/RUDP.ServerOn.cs index cd9b4492..9e3a1859 100644 --- a/src/rudp/partials/RUDP.ServerOn.cs +++ b/src/rudp/partials/RUDP.ServerOn.cs @@ -7,40 +7,37 @@ namespace Netly { public static partial class RUDP { - public partial class Server + internal class ServerOn : IRUDP.ServerOn { - private class ServerOn : IRUDP.ServerOn - { - public EventHandler OnAccept; - public EventHandler OnClose; - public EventHandler OnError; - public EventHandler OnModify; - public EventHandler OnOpen; + public EventHandler OnAccept; + public EventHandler OnClose; + public EventHandler OnError; + public EventHandler OnModify; + public EventHandler OnOpen; - public void Open(Action callback) - { - OnOpen += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke()); - } + public void Open(Action callback) + { + OnOpen += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke()); + } - public void Error(Action callback) - { - OnError += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event)); - } + public void Error(Action callback) + { + OnError += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event)); + } - public void Close(Action callback) - { - OnClose += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke()); - } + public void Close(Action callback) + { + OnClose += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke()); + } - public void Modify(Action callback) - { - OnModify += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event)); - } + public void Modify(Action callback) + { + OnModify += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event)); + } - public void Accept(Action callback) - { - OnAccept += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event)); - } + public void Accept(Action callback) + { + OnAccept += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event)); } } } diff --git a/src/rudp/partials/RUDP.ServerTo.cs b/src/rudp/partials/RUDP.ServerTo.cs index 5aa3f2e2..463ab11f 100644 --- a/src/rudp/partials/RUDP.ServerTo.cs +++ b/src/rudp/partials/RUDP.ServerTo.cs @@ -1,6 +1,9 @@ using System; using System.Collections.Generic; +using System.Linq; +using System.Net.Sockets; using System.Text; +using System.Threading; using System.Threading.Tasks; using Byter; using Netly.Interfaces; @@ -12,57 +15,282 @@ public partial class RUDP private class ServerTo : IRUDP.ServerTo { public Host Host { get; private set; } - public bool IsOpened { get; private set; } + public bool IsOpened => _socket != null && !_isClosed; private readonly Server _server; - + private readonly List _clients; + private readonly object _clientsLocker, _contentsLooker; + private bool _isOpeningOrClosing, _isClosed; + private List<(Host host, byte[] data)> _contents; + private ServerOn On => _server._on; + private Socket _socket; + public ServerTo(Server server) { + Host = Host.Default; _server = server; + _clients = new List(); + _contents = new List<(Host host, byte[] data)>(); + _clientsLocker = new object(); + _contentsLooker = new object(); + _socket = null; + _isOpeningOrClosing = false; + _isClosed = true; } public IRUDP.Client[] GetClients() { - throw new NotImplementedException(); + lock (_clientsLocker) + { + return _clients.Select(x => (IRUDP.Client)x).ToArray(); + } } public Task Open(Host host) { - throw new NotImplementedException(); + if (_isClosed || _isOpeningOrClosing) return Task.CompletedTask; + + _isOpeningOrClosing = true; + + return Task.Run(() => + { + try + { + _socket = new Socket(host.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + + On.OnModify?.Invoke(null, _socket); + + _socket.Bind(host.EndPoint); + + Host = new Host(_socket.LocalEndPoint); + + _isClosed = false; + + InitAccept(); + + On.OnOpen?.Invoke(null, null); + } + catch (Exception e) + { + _isClosed = true; + On.OnError?.Invoke(null, e); + } + finally + { + _isOpeningOrClosing = false; + } + }); } public Task Close() { - throw new NotImplementedException(); + if (_isClosed || _isOpeningOrClosing) return Task.CompletedTask; + + _isOpeningOrClosing = true; + + return Task.Run(() => + { + try + { + _socket?.Shutdown(SocketShutdown.Both); + + Client[] clients; + + lock (_clientsLocker) + { + clients = _clients.ToArray(); + _clients.Clear(); + } + + if (clients.Length > 0) + foreach (var client in clients) + client.To.Close().Wait(); + + _socket?.Close(); + _socket?.Dispose(); + } + catch (Exception e) + { + NetlyEnvironment.Logger.Create(e); + } + finally + { + _socket = null; + _isClosed = true; + _isOpeningOrClosing = false; + On.OnClose?.Invoke(null, null); + } + }); } public void DataBroadcast(byte[] data, MessageType messageType) { - throw new NotImplementedException(); + if (!IsOpened || data == null || data.Length <= 0) return; + + Broadcast(data, messageType); } public void DataBroadcast(string data, MessageType messageType) { - throw new NotImplementedException(); + if (!IsOpened || data == null || data.Length <= 0) return; + + Broadcast(data.GetBytes(), messageType); } public void DataBroadcast(string data, MessageType messageType, Encoding encoding) { - throw new NotImplementedException(); + if (!IsOpened || data == null || data.Length <= 0) return; + + Broadcast(data.GetBytes(encoding), messageType); } public void EventBroadcast(string name, byte[] data, MessageType messageType) { - throw new NotImplementedException(); + if (!IsOpened || name == null || name.Length <= 0 || data == null || data.Length <= 0) return; + + Broadcast(name, data, messageType); } public void EventBroadcast(string name, string data, MessageType messageType) { - throw new NotImplementedException(); + if (!IsOpened || name == null || name.Length <= 0 || data == null || data.Length <= 0) return; + + Broadcast(name, data.GetBytes(), messageType); } public void EventBroadcast(string name, string data, MessageType messageType, Encoding encoding) { - throw new NotImplementedException(); + if (!IsOpened || name == null || name.Length <= 0 || data == null || data.Length <= 0) return; + + Broadcast(name, data.GetBytes(encoding), messageType); + } + + private void Broadcast(byte[] data, MessageType messageType) + { + try + { + Client[] clients; + + lock (_clientsLocker) + { + clients = _clients.ToArray(); + } + + if (clients.Length > 0) + foreach (var client in clients) + client?.To.Data(data, messageType); + } + catch (Exception e) + { + NetlyEnvironment.Logger.Create(e); + } + } + + private void Broadcast(string name, byte[] data, MessageType messageType) + { + try + { + Client[] clients; + + lock (_clientsLocker) + { + clients = _clients.ToArray(); + } + + if (clients.Length > 0) + foreach (var client in clients) + client?.To.Data(data, messageType); + } + catch (Exception e) + { + NetlyEnvironment.Logger.Create(e); + } + } + + + private void InitAccept() + { + var length = (int)_socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer); + var buffer = new byte[length > 0 ? length : 4096]; + var remoteEndPoint = Host.EndPoint; + + AcceptUpdate(); + + new Thread(() => ContentUpdate()).Start(); + + void AcceptUpdate() + { + if (!IsOpened) + { + Close(); + return; + } + + _socket.BeginReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref remoteEndPoint, + AcceptCallback, + null); + } + + void AcceptCallback(IAsyncResult result) + { + try + { + var size = _socket.EndReceiveFrom(result, ref remoteEndPoint); + + if (size <= 0) + { + AcceptUpdate(); + return; + } + + var data = new byte[size]; + + Array.Copy(buffer, 0, data, 0, data.Length); + + lock (_contentsLooker) + { + _contents.Add((new Host(remoteEndPoint), data)); + } + } + catch (Exception e) + { + NetlyEnvironment.Logger.Create(e); + } + + AcceptUpdate(); + } + } + + private void ContentUpdate() + { + while (IsOpened) + { + // ReSharper disable once InconsistentlySynchronizedField + if (_contents.Count <= 0) continue; + + (Host host, byte[] data) value; + + lock (_contentsLooker) + { + // recheck on thread safe area + if (_contents.Count <= 0) continue; + + // get first element + value = _contents[0]; + + // remove first element + _contents.RemoveAt(0); + } + + try + { + // TODO: implement this + _ = value; + } + catch (Exception e) + { + NetlyEnvironment.Logger.Create(e); + } + } } } }