Skip to content

Commit

Permalink
Ensure ticket updates are processed for task queue reference count
Browse files Browse the repository at this point in the history
It is possible that when the reference count is retrieved that
the ticket updates have not yet been processed, which means
that the region section at the queued position may not exist or may
become nonexistant while or after the task is being added.

This restructures the reference counting code to now refer to
both a reference counter and a flag indicating whether ticket
updates are processed.

Fixes #262
This issue shows the race condition where the region becomes
nonexistant.
  • Loading branch information
Spottedleaf committed Dec 3, 2024
1 parent f50f363 commit e160105
Showing 1 changed file with 98 additions and 94 deletions.
192 changes: 98 additions & 94 deletions patches/server/0003-Threaded-Regions.patch
Original file line number Diff line number Diff line change
Expand Up @@ -2129,10 +2129,10 @@ index 0000000000000000000000000000000000000000..fc053ded0c14b76a1c6c82b59d3fd320
+}
diff --git a/src/main/java/io/papermc/paper/threadedregions/RegionizedTaskQueue.java b/src/main/java/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983d0c65eba
index 0000000000000000000000000000000000000000..a2313b5b4c37e8536973a8ea0b371557ea912473
--- /dev/null
+++ b/src/main/java/io/papermc/paper/threadedregions/RegionizedTaskQueue.java
@@ -0,0 +1,803 @@
@@ -0,0 +1,807 @@
+package io.papermc.paper.threadedregions;
+
+import ca.spottedleaf.concurrentutil.collection.MultiThreadedQueue;
Expand Down Expand Up @@ -2208,7 +2208,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ public static final class WorldRegionTaskData {
+ private final ServerLevel world;
+ private final MultiThreadedQueue<Runnable> globalChunkTask = new MultiThreadedQueue<>();
+ private final ConcurrentLong2ReferenceChainedHashTable<AtomicLong> referenceCounters = new ConcurrentLong2ReferenceChainedHashTable<>();
+ private final ConcurrentLong2ReferenceChainedHashTable<ReferenceCountData> referenceCounters = new ConcurrentLong2ReferenceChainedHashTable<>();
+
+ public WorldRegionTaskData(final ServerLevel world) {
+ this.world = world;
Expand Down Expand Up @@ -2258,96 +2258,95 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.processTicketUpdates(CoordinateUtils.getChunkX(coord), CoordinateUtils.getChunkZ(coord));
+ }
+
+ private void decrementReference(final AtomicLong reference, final long coord) {
+ final long val = reference.decrementAndGet();
+ if (val == 0L) {
+ final int chunkX = CoordinateUtils.getChunkX(coord);
+ final int chunkZ = CoordinateUtils.getChunkZ(coord);
+ final ca.spottedleaf.concurrentutil.lock.ReentrantAreaLock.Node ticketLock = this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.lock(chunkX, chunkZ);
+ try {
+ if (this.referenceCounters.remove(coord, reference) == reference) {
+ WorldRegionTaskData.this.removeTicket(coord);
+ } // else: race condition, something replaced our reference - not our issue anymore
+ } finally {
+ this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.unlock(ticketLock);
+ }
+ } else if (val < 0L) {
+ throw new IllegalStateException("Reference count < 0: " + val);
+ // note: only call on acquired referenceCountData
+ private void ensureTicketAdded(final long coord, final ReferenceCountData referenceCountData) {
+ if (!referenceCountData.addedTicket) {
+ // fine if multiple threads do this, no removeTicket may be called for this coord due to reference count inc
+ this.addTicket(coord);
+ this.processTicketUpdates(coord);
+ referenceCountData.addedTicket = true;
+ }
+ }
+
+ private AtomicLong incrementReference(final long coord) {
+ final AtomicLong ret = this.referenceCounters.get(coord);
+ if (ret != null) {
+ // try to fast acquire counter
+ int failures = 0;
+ for (long curr = ret.get();;) {
+ if (curr == 0L) {
+ // failed to fast acquire as reference expired
+ break;
+ }
+ private void decrementReference(final ReferenceCountData referenceCountData, final long coord) {
+ if (!referenceCountData.decreaseReferenceCount()) {
+ return;
+ } // else: need to remove ticket
+
+ for (int i = 0; i < failures; ++i) {
+ ConcurrentUtil.backoff();
+ }
+ // note: it is possible that another thread increments and then removes the reference before we can, so
+ // use ifPresent
+ this.referenceCounters.computeIfPresent(coord, (final long keyInMap, final ReferenceCountData valueInMap) -> {
+ if (valueInMap.referenceCount.get() != 0L) {
+ return valueInMap;
+ }
+
+ if (curr == (curr = ret.compareAndExchange(curr, curr + 1L))) {
+ return ret;
+ }
+ // note: valueInMap may not be referenceCountData
+
+ ++failures;
+ }
+ // possible to invoke this outside of the compute call, but not required and requires additional logic
+ WorldRegionTaskData.this.removeTicket(keyInMap);
+
+ return null;
+ });
+ }
+
+ private ReferenceCountData incrementReference(final long coord) {
+ ReferenceCountData referenceCountData = this.referenceCounters.get(coord);
+
+ if (referenceCountData != null && referenceCountData.addCount()) {
+ this.ensureTicketAdded(coord, referenceCountData);
+ return referenceCountData;
+ }
+
+ // slow acquire
+ final int chunkX = CoordinateUtils.getChunkX(coord);
+ final int chunkZ = CoordinateUtils.getChunkZ(coord);
+ final ca.spottedleaf.concurrentutil.lock.ReentrantAreaLock.Node ticketLock = this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.lock(chunkX, chunkZ);
+ final AtomicLong ret2;
+ final boolean processTicketUpdates;
+ try {
+ final AtomicLong replace = new AtomicLong(1L);
+ final AtomicLong valueInMap = this.referenceCounters.putIfAbsent(coord, replace);
+ referenceCountData = this.referenceCounters.compute(coord, (final long keyInMap, final ReferenceCountData valueInMap) -> {
+ if (valueInMap == null) {
+ // replaced, we should usually be here
+ this.addTicket(coord);
+ ret2 = replace;
+ processTicketUpdates = true;
+ } else {
+ processTicketUpdates = false;
+ int failures = 0;
+ for (long curr = valueInMap.get();;) {
+ if (curr == 0L) {
+ // don't need to add ticket here, since ticket is only removed during the lock
+ // we just need to replace the value in the map so that the thread removing fails and doesn't
+ // remove the ticket (see decrementReference)
+ this.referenceCounters.put(coord, replace);
+ ret2 = replace;
+ break;
+ }
+ // sets reference count to 1
+ return new ReferenceCountData();
+ }
+ // OK if we add from 0, the remove call will use compute() and catch this race condition
+ valueInMap.referenceCount.getAndIncrement();
+
+ for (int i = 0; i < failures; ++i) {
+ ConcurrentUtil.backoff();
+ }
+ return valueInMap;
+ });
+
+ if (curr == (curr = valueInMap.compareAndExchange(curr, curr + 1L))) {
+ // acquired
+ ret2 = valueInMap;
+ break;
+ }
+ this.ensureTicketAdded(coord, referenceCountData);
+
+ ++failures;
+ }
+ return referenceCountData;
+ }
+ }
+
+ private static final class ReferenceCountData {
+
+ public final AtomicLong referenceCount = new AtomicLong(1L);
+ public volatile boolean addedTicket;
+
+ // returns false if reference count is 0, otherwise increments ref count
+ public boolean addCount() {
+ int failures = 0;
+ for (long curr = this.referenceCount.get();;) {
+ for (int i = 0; i < failures; ++i) {
+ Thread.onSpinWait();
+ }
+ } finally {
+ this.world.moonrise$getChunkTaskScheduler().chunkHolderManager.ticketLockArea.unlock(ticketLock);
+ }
+
+ if (processTicketUpdates) {
+ this.processTicketUpdates(coord);
+ if (curr == 0L) {
+ return false;
+ }
+
+ if (curr == (curr = this.referenceCount.compareAndExchange(curr, curr + 1L))) {
+ return true;
+ }
+
+ ++failures;
+ }
+ }
+
+ return ret2;
+ // returns true if new reference count is 0
+ public boolean decreaseReferenceCount() {
+ final long res = this.referenceCount.decrementAndGet();
+ if (res >= 0L) {
+ return res == 0L;
+ } else {
+ throw new IllegalStateException("Negative reference count");
+ }
+ }
+ }
+
Expand Down Expand Up @@ -2544,7 +2543,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ final ArrayDeque<ChunkBasedPriorityTask>[] queues = this.queues;
+ final int max = Priority.IDLE.priority;
+ ChunkBasedPriorityTask task = null;
+ AtomicLong referenceCounter = null;
+ ReferenceCountData referenceCounter = null;
+ synchronized (this) {
+ if (this.isDestroyed) {
+ throw new IllegalStateException("Attempting to poll from dead queue");
Expand Down Expand Up @@ -2576,16 +2575,19 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+
+ private static final class ChunkBasedPriorityTask implements PrioritisedExecutor.PrioritisedTask {
+
+ private static final AtomicLong REFERENCE_COUNTER_NOT_SET = new AtomicLong(-1L);
+ private static final ReferenceCountData REFERENCE_COUNTER_NOT_SET = new ReferenceCountData();
+ static {
+ REFERENCE_COUNTER_NOT_SET.referenceCount.set((long)Integer.MIN_VALUE);
+ }
+
+ private final WorldRegionTaskData world;
+ private final int chunkX;
+ private final int chunkZ;
+ private final long sectionLowerLeftCoord; // chunk coordinate
+ private final boolean isChunkTask;
+
+ private volatile AtomicLong referenceCounter;
+ private static final VarHandle REFERENCE_COUNTER_HANDLE = ConcurrentUtil.getVarHandle(ChunkBasedPriorityTask.class, "referenceCounter", AtomicLong.class);
+ private volatile ReferenceCountData referenceCounter;
+ private static final VarHandle REFERENCE_COUNTER_HANDLE = ConcurrentUtil.getVarHandle(ChunkBasedPriorityTask.class, "referenceCounter", ReferenceCountData.class);
+ private Runnable run;
+ private volatile Priority priority;
+ private static final VarHandle PRIORITY_HANDLE = ConcurrentUtil.getVarHandle(ChunkBasedPriorityTask.class, "priority", Priority.class);
Expand Down Expand Up @@ -2622,16 +2624,16 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ return (Priority)PRIORITY_HANDLE.compareAndExchange(this, expect, update);
+ }
+
+ private void setReferenceCounterPlain(final AtomicLong value) {
+ private void setReferenceCounterPlain(final ReferenceCountData value) {
+ REFERENCE_COUNTER_HANDLE.set(this, value);
+ }
+
+ private AtomicLong getReferenceCounterVolatile() {
+ return (AtomicLong)REFERENCE_COUNTER_HANDLE.get(this);
+ private ReferenceCountData getReferenceCounterVolatile() {
+ return (ReferenceCountData)REFERENCE_COUNTER_HANDLE.get(this);
+ }
+
+ private AtomicLong compareAndExchangeReferenceCounter(final AtomicLong expect, final AtomicLong update) {
+ return (AtomicLong)REFERENCE_COUNTER_HANDLE.compareAndExchange(this, expect, update);
+ private ReferenceCountData compareAndExchangeReferenceCounter(final ReferenceCountData expect, final ReferenceCountData update) {
+ return (ReferenceCountData)REFERENCE_COUNTER_HANDLE.compareAndExchange(this, expect, update);
+ }
+
+ private void executeInternal() {
Expand All @@ -2648,7 +2650,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+
+ private boolean tryComplete(final boolean cancel) {
+ int failures = 0;
+ for (AtomicLong curr = this.getReferenceCounterVolatile();;) {
+ for (ReferenceCountData curr = this.getReferenceCounterVolatile();;) {
+ if (curr == null) {
+ return false;
+ }
Expand Down Expand Up @@ -2697,7 +2699,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ return false;
+ }
+
+ final AtomicLong referenceCounter = this.world.incrementReference(this.sectionLowerLeftCoord);
+ final ReferenceCountData referenceCounter = this.world.incrementReference(this.sectionLowerLeftCoord);
+ if (this.compareAndExchangeReferenceCounter(REFERENCE_COUNTER_NOT_SET, referenceCounter) != REFERENCE_COUNTER_NOT_SET) {
+ // we don't expect race conditions here, so it is OK if we have to needlessly reference count
+ this.world.decrementReference(referenceCounter, this.sectionLowerLeftCoord);
Expand Down Expand Up @@ -2747,7 +2749,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ }
+ }
+
+ private AtomicLong trySetCompleting(final int minPriority) {
+ private ReferenceCountData trySetCompleting(final int minPriority) {
+ // first, try to set priority to EXECUTING
+ for (Priority curr = this.getPriorityVolatile();;) {
+ if (curr.isLowerPriority(minPriority)) {
Expand All @@ -2759,7 +2761,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ } // else: continue
+ }
+
+ for (AtomicLong curr = this.getReferenceCounterVolatile();;) {
+ for (ReferenceCountData curr = this.getReferenceCounterVolatile();;) {
+ if (curr == null) {
+ // something acquired before us
+ return null;
Expand All @@ -2772,14 +2774,15 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ if (curr != (curr = this.compareAndExchangeReferenceCounter(curr, null))) {
+ continue;
+ }
+
+ return curr;
+ }
+ }
+
+ private void updatePriorityInQueue() {
+ boolean synchronise = false;
+ for (;;) {
+ final AtomicLong referenceCount = this.getReferenceCounterVolatile();
+ final ReferenceCountData referenceCount = this.getReferenceCounterVolatile();
+ if (referenceCount == REFERENCE_COUNTER_NOT_SET || referenceCount == null) {
+ // cancelled or not queued
+ return;
Expand All @@ -2798,6 +2801,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+ if (queue == null) {
+ if (!synchronise) {
+ // may be incorrectly null when unsynchronised
+ synchronise = true;
+ continue;
+ }
+ // must have been removed
Expand Down Expand Up @@ -2873,7 +2877,7 @@ index 0000000000000000000000000000000000000000..3de00bffe9b54f38e7c8ce30ba738983
+
+ @Override
+ public boolean setPriorityAndSubOrder(final Priority priority, final long subOrder) {
+ throw new UnsupportedOperationException();
+ return this.setPriority(priority);
+ }
+
+ @Override
Expand Down Expand Up @@ -3887,7 +3891,7 @@ index 0000000000000000000000000000000000000000..74ac328bf8d5f762f7060a6c5d49089d
+}
diff --git a/src/main/java/io/papermc/paper/threadedregions/ThreadedRegionizer.java b/src/main/java/io/papermc/paper/threadedregions/ThreadedRegionizer.java
new file mode 100644
index 0000000000000000000000000000000000000000..ce388e0ef231d7d73f75f5778c58eb40f6402f0f
index 0000000000000000000000000000000000000000..8e1b1df1c889d9235b10b86fc4cedbc06b7885c2
--- /dev/null
+++ b/src/main/java/io/papermc/paper/threadedregions/ThreadedRegionizer.java
@@ -0,0 +1,1405 @@
Expand Down Expand Up @@ -4878,7 +4882,7 @@ index 0000000000000000000000000000000000000000..ce388e0ef231d7d73f75f5778c58eb40
+
+ return this.state == STATE_READY;
+ } catch (final Throwable throwable) {
+ LOGGER.error("Failed to acquire region " + this, throwable);
+ LOGGER.error("Failed to release region " + this, throwable);
+ SneakyThrow.sneaky(throwable);
+ return false; // unreachable
+ } finally {
Expand Down

0 comments on commit e160105

Please sign in to comment.