diff --git a/src/Directory.Build.props b/src/Directory.Build.props index c5e968a6..a250448a 100644 --- a/src/Directory.Build.props +++ b/src/Directory.Build.props @@ -16,7 +16,7 @@ True true - 1.0.0 + 1.0.1 $(PackageVersion) diff --git a/src/DotNetty.Common/ThreadLocalPool.cs b/src/DotNetty.Common/ThreadLocalPool.cs index 327cb15e..a643101c 100644 --- a/src/DotNetty.Common/ThreadLocalPool.cs +++ b/src/DotNetty.Common/ThreadLocalPool.cs @@ -56,6 +56,7 @@ public override void Release(T value) { throw new InvalidOperationException("recycled already"); } + stack.Push(this); } } @@ -107,6 +108,7 @@ internal Head(StrongBox availableSharedCapacity, StrongBox weakTableCo { Interlocked.Decrement(ref this.weakTableCounter.Value); } + if (this.availableSharedCapacity == null) { return; @@ -138,13 +140,14 @@ internal bool ReserveSpace(int space) internal static bool ReserveSpace(StrongBox availableSharedCapacity, int space) { Debug.Assert(space >= 0); - for (; ; ) + for (;;) { int available = Volatile.Read(ref availableSharedCapacity.Value); if (available < space) { return false; } + if (Interlocked.CompareExchange(ref availableSharedCapacity.Value, available - space, available) == available) { return true; @@ -155,7 +158,9 @@ internal static bool ReserveSpace(StrongBox availableSharedCapacity, int sp // chain of data items readonly Head head; + Link tail; + // pointer to another queue of delayed items for the same stack internal WeakOrderQueue next; internal readonly WeakReference owner; @@ -220,10 +225,12 @@ internal void Add(DefaultHandle handle) // Drop it. return; } + // We allocate a Link so reserve the space this.tail = tail = tail.next = new Link(); writeIndex = tail.WriteIndex; } + tail.elements[writeIndex] = handle; handle.Stack = null; // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread; @@ -236,7 +243,7 @@ internal void Add(DefaultHandle handle) // transfer as many items as we can from this queue to the stack, returning true if any were transferred internal bool Transfer(Stack dst) { - Link head = this.head.link; + Link head = this.head?.link; if (head == null) { return false; @@ -248,6 +255,7 @@ internal bool Transfer(Stack dst) { return false; } + this.head.link = head = head.next; } @@ -259,6 +267,11 @@ internal bool Transfer(Stack dst) return false; } + if (dst?.elements == null) + { + return false; + } + int dstSize = dst.size; int expectedCapacity = dstSize + srcSize; @@ -273,9 +286,19 @@ internal bool Transfer(Stack dst) DefaultHandle[] srcElems = head.elements; DefaultHandle[] dstElems = dst.elements; int newDstSize = dstSize; + if (head.elements == null) + { + return false; + } + for (int i = srcStart; i < srcEnd; i++) { DefaultHandle element = srcElems[i]; + if (element == null) + { + return false; + } + if (element.recycleId == 0) { element.recycleId = element.lastRecycledId; @@ -284,6 +307,7 @@ internal bool Transfer(Stack dst) { throw new InvalidOperationException("recycled already"); } + srcElems[i] = null; if (dst.DropHandle(element)) @@ -291,6 +315,7 @@ internal bool Transfer(Stack dst) // Drop the object. continue; } + element.Stack = dst; dstElems[newDstSize++] = element; } @@ -307,6 +332,7 @@ internal bool Transfer(Stack dst) { return false; } + dst.size = newDstSize; return true; } @@ -344,8 +370,13 @@ protected sealed class Stack WeakOrderQueue cursorQueue, prevQueue; volatile WeakOrderQueue headQueue; - internal Stack(ThreadLocalPool parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, - int ratioMask, int maxDelayedQueues) + internal Stack( + ThreadLocalPool parent, + Thread thread, + int maxCapacity, + int maxSharedCapacityFactor, + int ratioMask, + int maxDelayedQueues) { this.parent = parent; this.threadRef = new WeakReference(thread); @@ -410,6 +441,7 @@ void PushNow(DefaultHandle item) { throw new InvalidOperationException("released already"); } + item.recycleId = item.lastRecycledId = ownThreadId; int size = this.size; @@ -418,6 +450,7 @@ void PushNow(DefaultHandle item) // Hit the maximum capacity - drop the possibly youngest object. return; } + if (size == this.elements.Length) { Array.Resize(ref this.elements, Math.Min(size << 1, this.maxCapacity)); @@ -443,12 +476,14 @@ void PushLater(DefaultHandle item, Thread thread) delayedRecycled.Add(this, WeakOrderQueue.Dummy); return; } + // Check if we already reached the maximum number of delayed queues and if we can allocate at all. if ((queue = WeakOrderQueue.Allocate(this, thread, countedWeakTable)) == null) { // drop object return; } + delayedRecycled.Add(this, queue); } else if (queue == WeakOrderQueue.Dummy) @@ -469,8 +504,10 @@ internal bool DropHandle(DefaultHandle handle) // Drop the object. return true; } + handle.hasBeenRecycled = true; } + return false; } @@ -486,8 +523,10 @@ internal bool TryPop(out DefaultHandle item) item = null; return false; } + size = this.size; } + size--; DefaultHandle ret = this.elements[size]; elements[size] = null; @@ -495,6 +534,7 @@ internal bool TryPop(out DefaultHandle item) { throw new InvalidOperationException("recycled multiple times"); } + ret.recycleId = 0; ret.lastRecycledId = 0; this.size = size; @@ -564,6 +604,7 @@ bool ScavengeSome() } } } + if (prev != null) { prev.Next = next; @@ -604,6 +645,7 @@ public class CountedWeakTable internal readonly StrongBox Counter = new StrongBox(); } + protected override CountedWeakTable GetInitialValue() => new CountedWeakTable(); } @@ -612,8 +654,9 @@ static ThreadLocalPool() // In the future, we might have different maxCapacity for different object types. // e.g. io.netty.recycler.maxCapacity.writeTask // io.netty.recycler.maxCapacity.outboundBuffer - int maxCapacityPerThread = SystemPropertyUtil.GetInt("io.netty.recycler.maxCapacityPerThread", - SystemPropertyUtil.GetInt("io.netty.recycler.maxCapacity", DefaultInitialMaxCapacityPerThread)); + int maxCapacityPerThread = SystemPropertyUtil.GetInt( + "io.netty.recycler.maxCapacityPerThread", + SystemPropertyUtil.GetInt("io.netty.recycler.maxCapacity", DefaultInitialMaxCapacityPerThread)); if (maxCapacityPerThread < 0) { maxCapacityPerThread = DefaultInitialMaxCapacityPerThread; @@ -621,17 +664,21 @@ static ThreadLocalPool() DefaultMaxCapacityPerThread = maxCapacityPerThread; - DefaultMaxSharedCapacityFactor = Math.Max(2, - SystemPropertyUtil.GetInt("io.netty.recycler.maxSharedCapacityFactor", - 2)); + DefaultMaxSharedCapacityFactor = Math.Max( + 2, + SystemPropertyUtil.GetInt( + "io.netty.recycler.maxSharedCapacityFactor", + 2)); - DefaultMaxDelayedQueuesPerThread = Math.Max(0, - SystemPropertyUtil.GetInt("io.netty.recycler.maxDelayedQueuesPerThread", - // We use the same value as default EventLoop number - Environment.ProcessorCount * 2)); + DefaultMaxDelayedQueuesPerThread = Math.Max( + 0, + SystemPropertyUtil.GetInt( + "io.netty.recycler.maxDelayedQueuesPerThread", + // We use the same value as default EventLoop number + Environment.ProcessorCount * 2)); LinkCapacity = MathUtil.SafeFindNextPositivePowerOfTwo( - Math.Max(SystemPropertyUtil.GetInt("io.netty.recycler.linkCapacity", 16), 16)); + Math.Max(SystemPropertyUtil.GetInt("io.netty.recycler.linkCapacity", 16), 16)); // By default we allow one push to a Recycler for each 8th try on handles that were never recycled before. // This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation @@ -663,12 +710,15 @@ static ThreadLocalPool() } public ThreadLocalPool(int maxCapacityPerThread) - : this (maxCapacityPerThread, DefaultMaxSharedCapacityFactor, DefaultRatio, DefaultMaxDelayedQueuesPerThread) + : this(maxCapacityPerThread, DefaultMaxSharedCapacityFactor, DefaultRatio, DefaultMaxDelayedQueuesPerThread) { } - public ThreadLocalPool(int maxCapacityPerThread, int maxSharedCapacityFactor, - int ratio, int maxDelayedQueuesPerThread) + public ThreadLocalPool( + int maxCapacityPerThread, + int maxSharedCapacityFactor, + int ratio, + int maxDelayedQueuesPerThread) { this.ratioMask = MathUtil.SafeFindNextPositivePowerOfTwo(ratio) - 1; if (maxCapacityPerThread <= 0) @@ -718,8 +768,13 @@ public ThreadLocalPool(Func valueFactory, int maxCapacityPerThread, i { } - public ThreadLocalPool(Func valueFactory, int maxCapacityPerThread, int maxSharedCapacityFactor, - int ratio, int maxDelayedQueuesPerThread, bool preCreate = false) + public ThreadLocalPool( + Func valueFactory, + int maxCapacityPerThread, + int maxSharedCapacityFactor, + int ratio, + int maxDelayedQueuesPerThread, + bool preCreate = false) : base(maxCapacityPerThread, maxSharedCapacityFactor, ratio, maxDelayedQueuesPerThread) { Contract.Requires(valueFactory != null); @@ -743,6 +798,7 @@ public T Take() { handle = CreateValue(stack); } + return (T)handle.Value; } @@ -768,8 +824,13 @@ public ThreadLocalStack(ThreadLocalPool owner) protected override Stack GetInitialValue() { - var stack = new Stack(this.owner, Thread.CurrentThread, this.owner.maxCapacityPerThread, - this.owner.maxSharedCapacityFactor, this.owner.ratioMask, this.owner.maxDelayedQueuesPerThread); + var stack = new Stack( + this.owner, + Thread.CurrentThread, + this.owner.maxCapacityPerThread, + this.owner.maxSharedCapacityFactor, + this.owner.ratioMask, + this.owner.maxDelayedQueuesPerThread); if (this.owner.preCreate) { for (int i = 0; i < this.owner.maxCapacityPerThread; i++) @@ -777,6 +838,7 @@ protected override Stack GetInitialValue() stack.Push(this.owner.CreateValue(stack)); } } + return stack; }