Skip to content

Commit

Permalink
configureable limit for max parallel sends
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-heinz committed May 13, 2020
1 parent 8bc5d80 commit aa5c1ce
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 7 deletions.
2 changes: 1 addition & 1 deletion .version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.2
1.0.3
19 changes: 17 additions & 2 deletions Arrowgene.Networking/Tcp/Server/AsyncEvent/AsyncEventClient.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace Arrowgene.Networking.Tcp.Server.AsyncEvent
{
public class AsyncEventClient : ITcpSocket
{
private readonly SemaphoreSlim _maxSimultaneousSends;

public string Identity { get; }
public IPAddress RemoteIpAddress { get; }
public ushort Port { get; }
Expand All @@ -30,9 +33,11 @@ public bool IsAlive
private readonly AsyncEventServer _server;
private readonly object _lock;

public AsyncEventClient(Socket socket, SocketAsyncEventArgs readEventArgs, AsyncEventServer server, int uoo)
public AsyncEventClient(Socket socket, SocketAsyncEventArgs readEventArgs, AsyncEventServer server, int uoo,
int maxSimultaneousSends)
{
_lock = new object();
_maxSimultaneousSends = new SemaphoreSlim(maxSimultaneousSends, maxSimultaneousSends);
_isAlive = true;
Socket = socket;
ReadEventArgs = readEventArgs;
Expand All @@ -53,6 +58,16 @@ public void Send(byte[] data)
_server.Send(this, data);
}

public void ReleaseSend()
{
_maxSimultaneousSends.Release();
}

public void WaitSend()
{
_maxSimultaneousSends.Wait();
}

public void Close()
{
lock (_lock)
Expand All @@ -79,4 +94,4 @@ public void Close()
ReadEventArgs = null;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ public void Send(AsyncEventClient client, byte[] data)
_maxNumberSendOperations.Release();
return;
}

AsyncEventToken token = (AsyncEventToken) writeEventArgs.UserToken;
token.Assign(client, data);
client.WaitSend();
StartSend(writeEventArgs);
}

Expand Down Expand Up @@ -427,7 +427,8 @@ private void ProcessAccept(SocketAsyncEventArgs acceptEventArg)
acceptSocket,
readEventArgs,
this,
unitOfOrder
unitOfOrder,
_settings.MaxSimultaneousSendsPerClient
);
_clients.Add(client);
readEventArgs.UserToken = client;
Expand Down Expand Up @@ -603,6 +604,7 @@ private void ProcessSend(SocketAsyncEventArgs writeEventArgs)
private void ReleaseWrite(SocketAsyncEventArgs writeEventArgs)
{
AsyncEventToken token = (AsyncEventToken) writeEventArgs.UserToken;
token.Client.ReleaseSend();
token.Reset();
_sendPool.Push(writeEventArgs);
_maxNumberSendOperations.Release();
Expand Down Expand Up @@ -643,4 +645,4 @@ private void CheckSocketTimeout()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class AsyncEventSettings : ICloneable
[DataMember(Order = 4)] public int Retries { get; set; }

[DataMember(Order = 5)] public int MaxUnitOfOrder { get; set; }

[DataMember(Order = 6)] public int MaxSimultaneousSendsPerClient { get; set; }

[DataMember(Order = 9)] public int SocketTimeoutSeconds { get; set; }

Expand All @@ -31,6 +33,7 @@ public AsyncEventSettings()
SocketSettings = new SocketSettings();
Identity = "";
MaxUnitOfOrder = 1;
MaxSimultaneousSendsPerClient = 1;
SocketTimeoutSeconds = -1;
}

Expand All @@ -43,6 +46,7 @@ public AsyncEventSettings(AsyncEventSettings settings)
Retries = settings.Retries;
SocketSettings = new SocketSettings(settings.SocketSettings);
MaxUnitOfOrder = settings.MaxUnitOfOrder;
MaxSimultaneousSendsPerClient = settings.NumSimultaneouslyWriteOperations;
SocketTimeoutSeconds = settings.SocketTimeoutSeconds;
}

Expand All @@ -51,4 +55,4 @@ public object Clone()
return new AsyncEventSettings(this);
}
}
}
}

0 comments on commit aa5c1ce

Please sign in to comment.