Skip to content

Commit

Permalink
reimpl. RUDP.Server
Browse files Browse the repository at this point in the history
  • Loading branch information
alec1o committed Jul 3, 2024
1 parent 8010ff9 commit f915f30
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/rudp/partials/RUDP.Server.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public static partial class RUDP
{
public partial class Server : IRUDP.Server
{
private readonly ServerOn _on;
internal readonly ServerOn _on;
private readonly ServerTo _to;

public Server()
Expand Down
53 changes: 25 additions & 28 deletions src/rudp/partials/RUDP.ServerOn.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,37 @@ namespace Netly
{
public static partial class RUDP
{
public partial class Server
internal class ServerOn : IRUDP.ServerOn
{
private class ServerOn : IRUDP.ServerOn
{
public EventHandler<IRUDP.Client> OnAccept;
public EventHandler OnClose;
public EventHandler<Exception> OnError;
public EventHandler<Socket> OnModify;
public EventHandler OnOpen;
public EventHandler<IRUDP.Client> OnAccept;
public EventHandler OnClose;
public EventHandler<Exception> OnError;
public EventHandler<Socket> OnModify;
public EventHandler OnOpen;

public void Open(Action callback)
{
OnOpen += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke());
}
public void Open(Action callback)
{
OnOpen += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke());
}

public void Error(Action<Exception> callback)
{
OnError += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event));
}
public void Error(Action<Exception> callback)
{
OnError += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event));
}

public void Close(Action callback)
{
OnClose += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke());
}
public void Close(Action callback)
{
OnClose += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke());
}

public void Modify(Action<Socket> callback)
{
OnModify += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event));
}
public void Modify(Action<Socket> callback)
{
OnModify += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event));
}

public void Accept(Action<IRUDP.Client> callback)
{
OnAccept += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event));
}
public void Accept(Action<IRUDP.Client> callback)
{
OnAccept += (@object, @event) => Env.MainThread.Add(() => callback?.Invoke(@event));
}
}
}
Expand Down
250 changes: 239 additions & 11 deletions src/rudp/partials/RUDP.ServerTo.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Byter;
using Netly.Interfaces;
Expand All @@ -12,57 +15,282 @@ public partial class RUDP
private class ServerTo : IRUDP.ServerTo
{
public Host Host { get; private set; }
public bool IsOpened { get; private set; }
public bool IsOpened => _socket != null && !_isClosed;
private readonly Server _server;

private readonly List<Client> _clients;
private readonly object _clientsLocker, _contentsLooker;
private bool _isOpeningOrClosing, _isClosed;
private List<(Host host, byte[] data)> _contents;
private ServerOn On => _server._on;
private Socket _socket;

public ServerTo(Server server)
{
Host = Host.Default;
_server = server;
_clients = new List<Client>();
_contents = new List<(Host host, byte[] data)>();
_clientsLocker = new object();
_contentsLooker = new object();
_socket = null;
_isOpeningOrClosing = false;
_isClosed = true;
}

public IRUDP.Client[] GetClients()
{
throw new NotImplementedException();
lock (_clientsLocker)
{
return _clients.Select(x => (IRUDP.Client)x).ToArray();
}
}

public Task Open(Host host)
{
throw new NotImplementedException();
if (_isClosed || _isOpeningOrClosing) return Task.CompletedTask;

_isOpeningOrClosing = true;

return Task.Run(() =>
{
try
{
_socket = new Socket(host.AddressFamily, SocketType.Dgram, ProtocolType.Udp);

On.OnModify?.Invoke(null, _socket);

_socket.Bind(host.EndPoint);

Host = new Host(_socket.LocalEndPoint);

_isClosed = false;

InitAccept();

On.OnOpen?.Invoke(null, null);
}
catch (Exception e)
{
_isClosed = true;
On.OnError?.Invoke(null, e);
}
finally
{
_isOpeningOrClosing = false;
}
});
}

public Task Close()
{
throw new NotImplementedException();
if (_isClosed || _isOpeningOrClosing) return Task.CompletedTask;

_isOpeningOrClosing = true;

return Task.Run(() =>
{
try
{
_socket?.Shutdown(SocketShutdown.Both);

Client[] clients;

lock (_clientsLocker)
{
clients = _clients.ToArray();
_clients.Clear();
}

if (clients.Length > 0)
foreach (var client in clients)
client.To.Close().Wait();

_socket?.Close();
_socket?.Dispose();
}
catch (Exception e)
{
NetlyEnvironment.Logger.Create(e);
}
finally
{
_socket = null;
_isClosed = true;
_isOpeningOrClosing = false;
On.OnClose?.Invoke(null, null);
}
});
}

public void DataBroadcast(byte[] data, MessageType messageType)
{
throw new NotImplementedException();
if (!IsOpened || data == null || data.Length <= 0) return;

Broadcast(data, messageType);
}

public void DataBroadcast(string data, MessageType messageType)
{
throw new NotImplementedException();
if (!IsOpened || data == null || data.Length <= 0) return;

Broadcast(data.GetBytes(), messageType);
}

public void DataBroadcast(string data, MessageType messageType, Encoding encoding)
{
throw new NotImplementedException();
if (!IsOpened || data == null || data.Length <= 0) return;

Broadcast(data.GetBytes(encoding), messageType);
}

public void EventBroadcast(string name, byte[] data, MessageType messageType)
{
throw new NotImplementedException();
if (!IsOpened || name == null || name.Length <= 0 || data == null || data.Length <= 0) return;

Broadcast(name, data, messageType);
}

public void EventBroadcast(string name, string data, MessageType messageType)
{
throw new NotImplementedException();
if (!IsOpened || name == null || name.Length <= 0 || data == null || data.Length <= 0) return;

Broadcast(name, data.GetBytes(), messageType);
}

public void EventBroadcast(string name, string data, MessageType messageType, Encoding encoding)
{
throw new NotImplementedException();
if (!IsOpened || name == null || name.Length <= 0 || data == null || data.Length <= 0) return;

Broadcast(name, data.GetBytes(encoding), messageType);
}

private void Broadcast(byte[] data, MessageType messageType)
{
try
{
Client[] clients;

lock (_clientsLocker)
{
clients = _clients.ToArray();
}

if (clients.Length > 0)
foreach (var client in clients)
client?.To.Data(data, messageType);
}
catch (Exception e)
{
NetlyEnvironment.Logger.Create(e);
}
}

private void Broadcast(string name, byte[] data, MessageType messageType)
{
try
{
Client[] clients;

lock (_clientsLocker)
{
clients = _clients.ToArray();
}

if (clients.Length > 0)
foreach (var client in clients)
client?.To.Data(data, messageType);
}
catch (Exception e)
{
NetlyEnvironment.Logger.Create(e);
}
}


private void InitAccept()
{
var length = (int)_socket.GetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer);
var buffer = new byte[length > 0 ? length : 4096];
var remoteEndPoint = Host.EndPoint;

AcceptUpdate();

new Thread(() => ContentUpdate()).Start();

void AcceptUpdate()
{
if (!IsOpened)
{
Close();
return;
}

_socket.BeginReceiveFrom(buffer, 0, buffer.Length, SocketFlags.None, ref remoteEndPoint,
AcceptCallback,
null);
}

void AcceptCallback(IAsyncResult result)
{
try
{
var size = _socket.EndReceiveFrom(result, ref remoteEndPoint);

if (size <= 0)
{
AcceptUpdate();
return;
}

var data = new byte[size];

Array.Copy(buffer, 0, data, 0, data.Length);

lock (_contentsLooker)
{
_contents.Add((new Host(remoteEndPoint), data));
}
}
catch (Exception e)
{
NetlyEnvironment.Logger.Create(e);
}

AcceptUpdate();
}
}

private void ContentUpdate()
{
while (IsOpened)
{
// ReSharper disable once InconsistentlySynchronizedField
if (_contents.Count <= 0) continue;

(Host host, byte[] data) value;

lock (_contentsLooker)
{
// recheck on thread safe area
if (_contents.Count <= 0) continue;

// get first element
value = _contents[0];

// remove first element
_contents.RemoveAt(0);
}

try
{
// TODO: implement this
_ = value;
}
catch (Exception e)
{
NetlyEnvironment.Logger.Create(e);
}
}
}
}
}
Expand Down

0 comments on commit f915f30

Please sign in to comment.