Skip to content

Commit

Permalink
Merge pull request #5 from NethermindEth/handlingNullExceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
rubo committed Jul 31, 2023
2 parents 823cee7 + 8054b28 commit fe619c1
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 22 deletions.
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<GeneratePackageOnBuild Condition="'$(Configuration)'=='Package'">True</GeneratePackageOnBuild>
<IncludeSymbols>true</IncludeSymbols>

<PackageVersion>1.0.0</PackageVersion>
<PackageVersion>1.0.1</PackageVersion>
<Version>$(PackageVersion)</Version>
</PropertyGroup>

Expand Down
104 changes: 83 additions & 21 deletions src/DotNetty.Common/ThreadLocalPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public override void Release<T>(T value)
{
throw new InvalidOperationException("recycled already");
}

stack.Push(this);
}
}
Expand Down Expand Up @@ -107,6 +108,7 @@ internal Head(StrongBox<int> availableSharedCapacity, StrongBox<int> weakTableCo
{
Interlocked.Decrement(ref this.weakTableCounter.Value);
}

if (this.availableSharedCapacity == null)
{
return;
Expand Down Expand Up @@ -138,13 +140,14 @@ internal bool ReserveSpace(int space)
internal static bool ReserveSpace(StrongBox<int> availableSharedCapacity, int space)
{
Debug.Assert(space >= 0);
for (; ; )
for (;;)
{
int available = Volatile.Read(ref availableSharedCapacity.Value);
if (available < space)
{
return false;
}

if (Interlocked.CompareExchange(ref availableSharedCapacity.Value, available - space, available) == available)
{
return true;
Expand All @@ -155,7 +158,9 @@ internal static bool ReserveSpace(StrongBox<int> availableSharedCapacity, int sp

// chain of data items
readonly Head head;

Link tail;

// pointer to another queue of delayed items for the same stack
internal WeakOrderQueue next;
internal readonly WeakReference<Thread> owner;
Expand Down Expand Up @@ -220,10 +225,12 @@ internal void Add(DefaultHandle handle)
// Drop it.
return;
}

// We allocate a Link so reserve the space
this.tail = tail = tail.next = new Link();
writeIndex = tail.WriteIndex;
}

tail.elements[writeIndex] = handle;
handle.Stack = null;
// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
Expand All @@ -236,7 +243,7 @@ internal void Add(DefaultHandle handle)
// transfer as many items as we can from this queue to the stack, returning true if any were transferred
internal bool Transfer(Stack dst)
{
Link head = this.head.link;
Link head = this.head?.link;
if (head == null)
{
return false;
Expand All @@ -248,6 +255,7 @@ internal bool Transfer(Stack dst)
{
return false;
}

this.head.link = head = head.next;
}

Expand All @@ -259,6 +267,11 @@ internal bool Transfer(Stack dst)
return false;
}

if (dst?.elements == null)
{
return false;
}

int dstSize = dst.size;
int expectedCapacity = dstSize + srcSize;

Expand All @@ -273,9 +286,19 @@ internal bool Transfer(Stack dst)
DefaultHandle[] srcElems = head.elements;
DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
if (head.elements == null)
{
return false;
}

for (int i = srcStart; i < srcEnd; i++)
{
DefaultHandle element = srcElems[i];
if (element == null)
{
return false;
}

if (element.recycleId == 0)
{
element.recycleId = element.lastRecycledId;
Expand All @@ -284,13 +307,15 @@ internal bool Transfer(Stack dst)
{
throw new InvalidOperationException("recycled already");
}

srcElems[i] = null;

if (dst.DropHandle(element))
{
// Drop the object.
continue;
}

element.Stack = dst;
dstElems[newDstSize++] = element;
}
Expand All @@ -307,6 +332,7 @@ internal bool Transfer(Stack dst)
{
return false;
}

dst.size = newDstSize;
return true;
}
Expand Down Expand Up @@ -344,8 +370,13 @@ protected sealed class Stack
WeakOrderQueue cursorQueue, prevQueue;
volatile WeakOrderQueue headQueue;

internal Stack(ThreadLocalPool parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
int ratioMask, int maxDelayedQueues)
internal Stack(
ThreadLocalPool parent,
Thread thread,
int maxCapacity,
int maxSharedCapacityFactor,
int ratioMask,
int maxDelayedQueues)
{
this.parent = parent;
this.threadRef = new WeakReference<Thread>(thread);
Expand Down Expand Up @@ -410,6 +441,7 @@ void PushNow(DefaultHandle item)
{
throw new InvalidOperationException("released already");
}

item.recycleId = item.lastRecycledId = ownThreadId;

int size = this.size;
Expand All @@ -418,6 +450,7 @@ void PushNow(DefaultHandle item)
// Hit the maximum capacity - drop the possibly youngest object.
return;
}

if (size == this.elements.Length)
{
Array.Resize(ref this.elements, Math.Min(size << 1, this.maxCapacity));
Expand All @@ -443,12 +476,14 @@ void PushLater(DefaultHandle item, Thread thread)
delayedRecycled.Add(this, WeakOrderQueue.Dummy);
return;
}

// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
if ((queue = WeakOrderQueue.Allocate(this, thread, countedWeakTable)) == null)
{
// drop object
return;
}

delayedRecycled.Add(this, queue);
}
else if (queue == WeakOrderQueue.Dummy)
Expand All @@ -469,8 +504,10 @@ internal bool DropHandle(DefaultHandle handle)
// Drop the object.
return true;
}

handle.hasBeenRecycled = true;
}

return false;
}

Expand All @@ -486,15 +523,18 @@ internal bool TryPop(out DefaultHandle item)
item = null;
return false;
}

size = this.size;
}

size--;
DefaultHandle ret = this.elements[size];
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId)
{
throw new InvalidOperationException("recycled multiple times");
}

ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
Expand Down Expand Up @@ -564,6 +604,7 @@ bool ScavengeSome()
}
}
}

if (prev != null)
{
prev.Next = next;
Expand Down Expand Up @@ -604,6 +645,7 @@ public class CountedWeakTable

internal readonly StrongBox<int> Counter = new StrongBox<int>();
}

protected override CountedWeakTable GetInitialValue() => new CountedWeakTable();
}

Expand All @@ -612,26 +654,31 @@ static ThreadLocalPool()
// In the future, we might have different maxCapacity for different object types.
// e.g. io.netty.recycler.maxCapacity.writeTask
// io.netty.recycler.maxCapacity.outboundBuffer
int maxCapacityPerThread = SystemPropertyUtil.GetInt("io.netty.recycler.maxCapacityPerThread",
SystemPropertyUtil.GetInt("io.netty.recycler.maxCapacity", DefaultInitialMaxCapacityPerThread));
int maxCapacityPerThread = SystemPropertyUtil.GetInt(
"io.netty.recycler.maxCapacityPerThread",
SystemPropertyUtil.GetInt("io.netty.recycler.maxCapacity", DefaultInitialMaxCapacityPerThread));
if (maxCapacityPerThread < 0)
{
maxCapacityPerThread = DefaultInitialMaxCapacityPerThread;
}

DefaultMaxCapacityPerThread = maxCapacityPerThread;

DefaultMaxSharedCapacityFactor = Math.Max(2,
SystemPropertyUtil.GetInt("io.netty.recycler.maxSharedCapacityFactor",
2));
DefaultMaxSharedCapacityFactor = Math.Max(
2,
SystemPropertyUtil.GetInt(
"io.netty.recycler.maxSharedCapacityFactor",
2));

DefaultMaxDelayedQueuesPerThread = Math.Max(0,
SystemPropertyUtil.GetInt("io.netty.recycler.maxDelayedQueuesPerThread",
// We use the same value as default EventLoop number
Environment.ProcessorCount * 2));
DefaultMaxDelayedQueuesPerThread = Math.Max(
0,
SystemPropertyUtil.GetInt(
"io.netty.recycler.maxDelayedQueuesPerThread",
// We use the same value as default EventLoop number
Environment.ProcessorCount * 2));

LinkCapacity = MathUtil.SafeFindNextPositivePowerOfTwo(
Math.Max(SystemPropertyUtil.GetInt("io.netty.recycler.linkCapacity", 16), 16));
Math.Max(SystemPropertyUtil.GetInt("io.netty.recycler.linkCapacity", 16), 16));

// By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
// This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
Expand Down Expand Up @@ -663,12 +710,15 @@ static ThreadLocalPool()
}

public ThreadLocalPool(int maxCapacityPerThread)
: this (maxCapacityPerThread, DefaultMaxSharedCapacityFactor, DefaultRatio, DefaultMaxDelayedQueuesPerThread)
: this(maxCapacityPerThread, DefaultMaxSharedCapacityFactor, DefaultRatio, DefaultMaxDelayedQueuesPerThread)
{
}

public ThreadLocalPool(int maxCapacityPerThread, int maxSharedCapacityFactor,
int ratio, int maxDelayedQueuesPerThread)
public ThreadLocalPool(
int maxCapacityPerThread,
int maxSharedCapacityFactor,
int ratio,
int maxDelayedQueuesPerThread)
{
this.ratioMask = MathUtil.SafeFindNextPositivePowerOfTwo(ratio) - 1;
if (maxCapacityPerThread <= 0)
Expand Down Expand Up @@ -718,8 +768,13 @@ public ThreadLocalPool(Func<Handle, T> valueFactory, int maxCapacityPerThread, i
{
}

public ThreadLocalPool(Func<Handle, T> valueFactory, int maxCapacityPerThread, int maxSharedCapacityFactor,
int ratio, int maxDelayedQueuesPerThread, bool preCreate = false)
public ThreadLocalPool(
Func<Handle, T> valueFactory,
int maxCapacityPerThread,
int maxSharedCapacityFactor,
int ratio,
int maxDelayedQueuesPerThread,
bool preCreate = false)
: base(maxCapacityPerThread, maxSharedCapacityFactor, ratio, maxDelayedQueuesPerThread)
{
Contract.Requires(valueFactory != null);
Expand All @@ -743,6 +798,7 @@ public T Take()
{
handle = CreateValue(stack);
}

return (T)handle.Value;
}

Expand All @@ -768,15 +824,21 @@ public ThreadLocalStack(ThreadLocalPool<T> owner)

protected override Stack GetInitialValue()
{
var stack = new Stack(this.owner, Thread.CurrentThread, this.owner.maxCapacityPerThread,
this.owner.maxSharedCapacityFactor, this.owner.ratioMask, this.owner.maxDelayedQueuesPerThread);
var stack = new Stack(
this.owner,
Thread.CurrentThread,
this.owner.maxCapacityPerThread,
this.owner.maxSharedCapacityFactor,
this.owner.ratioMask,
this.owner.maxDelayedQueuesPerThread);
if (this.owner.preCreate)
{
for (int i = 0; i < this.owner.maxCapacityPerThread; i++)
{
stack.Push(this.owner.CreateValue(stack));
}
}

return stack;
}

Expand Down

0 comments on commit fe619c1

Please sign in to comment.