Skip to content

Commit

Permalink
Merge pull request #16 from Joshuaalbert/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
Joshuaalbert authored Aug 11, 2023
2 parents d835dfc + 59e04bb commit 9b837dc
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 23 deletions.
15 changes: 7 additions & 8 deletions fair_async_rlock/fair_async_rlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@ async def acquire(self):
self._owner = me
self._count = 1
except asyncio.CancelledError:
self._queue.remove(event)
try: # if in queue, then cancelled before release
self._queue.remove(event)
except ValueError: # otherwise, release happened, this was next, and we simulate passing on
self._owner = me
self._count = 1
self._current_task_release()
raise

def _current_task_release(self):
Expand Down Expand Up @@ -73,10 +78,4 @@ async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
try:
self.release()
except asyncio.CancelledError as e:
if self.is_owner():
# If a cancellation happened during release, we force the current task to release.
self._current_task_release()
raise e
self.release()
94 changes: 80 additions & 14 deletions fair_async_rlock/tests/test_fair_async_rlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,21 +360,30 @@ async def succeeding_task():
async def test_task_cancellation():
# We need to verify that if a task is cancelled while waiting for the lock, it gets removed from the queue.
lock = FairAsyncRLock()
t1_ac = asyncio.Event()
t1_done = asyncio.Event()
t2_ac = asyncio.Event()

async def task1():
await lock.acquire()
await asyncio.sleep(0.1) # Let's ensure the lock is held for a bit
lock.release()
async with lock:
t1_ac.set()
await t1_done.wait()

async def task2():
await lock.acquire()
await t1_ac.wait()
async with lock:
t2_ac.set()
await asyncio.sleep(1.) # Let's ensure the lock is held for a bit


task1 = asyncio.create_task(task1())
task2 = asyncio.create_task(task2())
await asyncio.sleep(0) # Yield control to allow tasks to start
await asyncio.sleep(0.1) # Yield control to allow tasks to start
task2.cancel()
with pytest.raises(asyncio.CancelledError):
await task2
assert not t2_ac.is_set() # shouldn't acquire
t1_done.set() # Let T1 finish
await task1 # Ensure task1 has a chance to release the lock
# Ensure that lock is not owned and queue is empty after cancellation
assert lock._owner is None
Expand Down Expand Up @@ -477,7 +486,6 @@ async def monitor_func():
if not task_to_cancel.done():
task_to_cancel.cancel()


# Create tasks
for i in range(num_tasks):
tasks.append(asyncio.create_task(task_func(i)))
Expand All @@ -502,13 +510,6 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await super().__aexit__(exc_type, exc_val, exc_tb)


class ExceptionFairAsyncRLock(FairAsyncRLock):
def release(self):
"""Release the lock"""
raise asyncio.CancelledError()
super().release()


@pytest.mark.asyncio
async def test_delayed_release():
lock = DelayedFairAsyncRLock()
Expand All @@ -531,8 +532,15 @@ async def second_task():
assert t2.result() is True, "Second task should acquire the lock after the first task has released it"


class ExceptionFairAsyncRLock(FairAsyncRLock):
def release(self):
"""Release the lock"""
raise asyncio.CancelledError()
super().release()


@pytest.mark.asyncio
async def test_exception_on_release_gh7():
async def _test_exception_on_release_gh7():
lock = ExceptionFairAsyncRLock()

async def task():
Expand All @@ -543,3 +551,61 @@ async def task():
await asyncio.create_task(task())
assert lock._owner is None, "Lock owner should be None after an exception"
assert len(lock._queue) == 0, "Lock queue should be empty after an exception"


@pytest.mark.asyncio
async def test_fair_async_rlock_deadlock_scenario_regression_gh14():
lock = FairAsyncRLock()

# Use events to control the order of execution
task1_aquired = asyncio.Event()
task2_started = asyncio.Event()
task3_started = asyncio.Event()
task3_acquired = asyncio.Event()
task4_acquired = asyncio.Event()
task4_started = asyncio.Event()
task1_done = asyncio.Event()

async def task1():
async with lock:
task1_aquired.set()
await task2_started.wait()
await task3_started.wait() # wait until Task 3 in queue too
await task4_started.wait() # wait until Task 4 in queue too
await asyncio.sleep(0.1) # make sure Task 4 gets in queue
task1_done.set() # signal done before release, so Task 2 can cancel Task 3

async def task2():
await task1_aquired.wait()
task2_started.set()
await task1_done.wait() # Wait until Task 1 done then cancel Task 3
t3.cancel()

async def task3():
await task2_started.wait()
task3_started.set()
async with lock: # now in queue, waiting
task3_acquired.set() # Should not get reached, because Task 3 will be cancelled always

async def task4():
await task3_started.wait()
await asyncio.sleep(0.1) # make sure Task 3 gets in queue first
task4_started.set()
async with lock: # it's now in queue, just after Task 3, and waiting
task4_acquired.set() # Will get set if no bug

t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
t3 = asyncio.create_task(task3())
t4 = asyncio.create_task(task4())

await t1
await t2
with pytest.raises(asyncio.CancelledError):
await t3 # Here we get ValueError: <asyncio.locks.Event object at 0x7f419f7dba90 [set]> is not in deque
# Task 3 would never acquire
assert not task3_acquired.is_set()

# Task 4 should not deadlock. It should be able to acquire the locks
await asyncio.wait([t4], timeout=1)
assert task4_acquired.is_set()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
long_description = fh.read()

setup(name='fair_async_rlock',
version='1.0.5',
version='1.0.6',
description='A well-tested implementation of a fair asynchronous RLock for concurrent programming.',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down

0 comments on commit 9b837dc

Please sign in to comment.