Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync_to_async calls hang / deadlock when called in a task #348

Closed
brownan opened this issue Oct 19, 2022 · 11 comments · Fixed by #367
Closed

sync_to_async calls hang / deadlock when called in a task #348

brownan opened this issue Oct 19, 2022 · 11 comments · Fixed by #367

Comments

@brownan
Copy link

brownan commented Oct 19, 2022

I ran into a weird deadlock when writing a django async view and trying to use asyncio.wait_for() in conjunction with sync_to_async to call a sync function but timeout if it takes longer than a few seconds. Internally, wait_for() creates a couple of async tasks using create_task(). So calling create_task() on the return of sync_to_async hangs forever.

Here's a simple asgi application that reproduces the problem. I'm on asgiref 3.5.2, and I've tested this with both daphne and uvicorn. The swing between the async context and the sync context is meant to replicate the control flow used in django if you have a sync middleware in your middleware stack.

import asyncio

from asgiref.sync import sync_to_async, async_to_sync, ThreadSensitiveContext


def some_sync_function():
    print("Some_sync_function")


async def django_async_view():
    print("django_async_view")
    # Works:
    #await sync_to_async(some_sync_function)()

    # Doesn't work, times out:
    await asyncio.wait_for(sync_to_async(some_sync_function, thread_sensitive=True)(), timeout=5)

    # Also doesn't work, hangs forever:
    #await asyncio.create_task(sync_to_async(some_sync_function, thread_sensitive=True)())


def django_sync_middleware():
    print("django_sync_middleware")
    async_to_sync(django_async_view)()


async def application(scope, receive, send):

    # Gets a "Single thread executor already being used, would deadlock" error
    #await sync_to_async(django_sync_middleware, thread_sensitive=True)()

    # Gets an asyncio.TimeoutError:
    async with ThreadSensitiveContext():
        await sync_to_async(django_sync_middleware, thread_sensitive=True)()

    await send(
        {
            "type": "http.response.start",
            "status": 200,
            "headers": [
                (b"content-type", b"text/plain"),
            ],
        }
    )
    await send(
        {
            "type": "http.response.body",
            "body": b"Hello, world!",
        }
    )

Am I doing something wrong? Or is this a bug?

@andrewgodwin
Copy link
Member

It's possible it's a bug - at the very least, if it's going to deadlock due to thread_sensitive it should probably raise an error rather than actually deadlocking.

When you say it "hangs forever", is it the actual create_task call that hangs, or is it that the task creates but never completes?

@brownan
Copy link
Author

brownan commented Oct 20, 2022

I debugged the async thread and stepped into the create_task() and that returned just fine. Next asyncio event loop iteration it calls into SyncToAsync.__call__ where it gets the thread pool from the current thread sensitive context via self.context_to_thread_executor, as expected. The function gets submitted to the executor just fine. The executor already had a thread and so didn't need to spin up a new one. But it seems the future representing the thread executor call never returns. I'm still debugging to see if I can figure out why.

@brownan
Copy link
Author

brownan commented Oct 20, 2022

Actually that's not as expected, is it. The second call to sync_to_async should be finding the sync thread's CurrentThreadExecutor, not the thread sensitive ThreadPoolExecutor from the context, shouldn't it?

@andrewgodwin
Copy link
Member

Yes, if there's a nested sync_to_async call within another, it should be propagating up and using the thread of its parent so that there's no deadlock - there's a whole weird mechanism for that. I wonder the context changes caused a bug in that area somehow.

@jthorniley
Copy link
Contributor

Hi, hope I'm not hijacking the thread, I think I'm running into the same thing.

I think the problem is that the async_to_sync/sync_to_async tools rely on asgiref.locals.Local to store the right executor above them in the call stack to run sync code on. The problem (I think) is that the asgiref Local() only propagates through plain awaits and does not propagate through new tasks created with asyncio.create_task or functions like wait_for or gather that have to separately create tasks behind the scenes. I've written a script to test the behaviour: https://gist.github.com/jthorniley/6574aa84691cca2c42b3ab14fe8cb42b

For me this outputs:

sys.version_info(major=3, minor=10, micro=7, releaselevel='final', serial=0)
asgiref version: 3.5.2
in caller_function, settings local to 'foo'
in caller_function, thread is <_MainThread(MainThread, started 8703130240)>
  in inner, thread is <_MainThread(MainThread, started 8703130240)>
  in inner, local is foo
in caller_function_gather, setting local to 'bar'
in caller_function_gather, thread is <_MainThread(MainThread, started 8703130240)>
  in inner, thread is <_MainThread(MainThread, started 8703130240)>
  in inner, local is not set
in caller_function_wait_for, setting local to 'bat'
in caller_function_wait_for, thread is <_MainThread(MainThread, started 8703130240)>
  in inner, thread is <_MainThread(MainThread, started 8703130240)>
  in inner, local is not set
in caller_function_create_task, setting local to 'bat'
in caller_function_create_task, thread is <_MainThread(MainThread, started 8703130240)>
  in inner, thread is <_MainThread(MainThread, started 8703130240)>
  in inner, local is not set

In the context of Django, I think the problem happens when:

  1. Run django server as asgi, so main server thread has a loop runnning and can't be used for sync code.
  2. Sync middleware - sync_to_async called but there's no sync thread above it, so creates a new sync thread to run sync code in, stores reference to ThreadPoolExecutor in SyncToAsync.context_to_thread_executor
  3. Async view - async_to_sync called from sync thread. Creates a CurrentThreadExecutor and stores in asgiref Local AsyncToSync.executors.current, so that inner sync_to_asyncs can find it. Then creates a new thread for the inner async code (there are now three threads. Django-main is running async loop but idle until something happens. Thread-1 is running the CurrentThreadExecutor instance which waits for the code inside async_to_sync to either finish or schedule tasks for it to run via sync_to_async. Thread-2 is going to run the async view code.
  4. Sync code inside async view:
    1. If we just await sync_to_async(fn)(), this sees AsyncToSync.executors.current has a value and uses the "CurrentThreadExecutor" to run it in the original sync thread from step 2 (definitely the desired behaviour).
    2. If we use asyncio.create_task and friends (but still await it inside the view code), sync_to_async can't see AsyncToSync.executors.current and falls through to the backup, which is the SyncToAsync.context_to_thread_executor created by the first call to SyncToAsync in step 2 above. BUT this executor is a plain ThreadPoolExecutor, with max_workers = 1 and is already running a CurrentThreadExecutor which AsyncToSync left running there to handle the inner sync code. Since the ThreadPoolExecutor is already running the only worker it can, and that worker is blocked until the outer async_to_sync finishes, we have a deadlock.

I think the question is: should either the behaviour of "local" be different - should it be maintained across create_task, or should the behaviour of SyncToAsync be different: There is a further fall through option in the SyncToAsync code based on AsyncToSync.loop_thread_executors which can look up the correct CurrentThreadExecutor for the running loop even if the current local is not set. This seems to be its intention:

    # When we can't find a CurrentThreadExecutor from the context, such as
    # inside create_task, we'll look it up here from the running event loop.
    loop_thread_executors: "Dict[asyncio.AbstractEventLoop, CurrentThreadExecutor]" = {}

I think this would fix the deadlock, but the option is a later fallback in SyncToAsync than the thread_sensitive_context based one that finds the blocked ThreadPoolExecutor. Basically swap the elifs on lines 398-414 of asgiref/sync.py

@carltongibson
Copy link
Member

carltongibson commented Jan 11, 2023

Thanks both @brownan and @jthorniley for the examples. Very nice. (@brownan this is was minimal reproduce you promised elsewhere, on the Channels issue, I suspect 😅 — I've only now had a chance to read it fully.)

I'm planning a round of Channels projects updates for this period coming up, so I'll make time to look at this too — but please do input if you have thoughts, as I have this in the 🤯-bucket… :)

@jthorniley Your numbered-point description sounds quite plausible. I need to sit down and verify that. I can't respond instantly to your "...I think the question is…" — I just wanted to reply to let you give you some response. (An ACK if you will.)

@brownan
Copy link
Author

brownan commented Jan 11, 2023

thanks for the analysis @jthorniley. @carltongibson I had originally thought this was a different issue than the one I reported elsewhere, but maybe not. I didn't have the time to dig into it further when I submitted that (and this) issue.

@jthorniley
Copy link
Contributor

@carltongibson thanks for the quick reply. I'm sure its something that needs some examination. In the hope that it helps I've created a pull request using the change to sync_to_async as the solution - however I'm not proposing that that is necessarily the correct solution just thought it might be helpful to experiment! I've included a test that reproduces the issue when run against the current main branch.

#367

@carltongibson
Copy link
Member

carltongibson commented Jan 12, 2023

In @brownan's example here we're entering the # Re-use thread executor in current context block:

asgiref/asgiref/sync.py

Lines 398 to 411 in 3602263

elif self.thread_sensitive_context and self.thread_sensitive_context.get(
None
):
# If we have a way of retrieving the current context, attempt
# to use a per-context thread pool executor
thread_sensitive_context = self.thread_sensitive_context.get()
if thread_sensitive_context in self.context_to_thread_executor:
# Re-use thread executor in current context
executor = self.context_to_thread_executor[thread_sensitive_context]
else:
# Create new thread executor in current context
executor = ThreadPoolExecutor(max_workers=1)
self.context_to_thread_executor[thread_sensitive_context] = executor

... but we're not setting the deadlock context.

Adding the equivalent checks to the non-context cases allows raising the would deadlock error, without leading to any test failures.

Refs #348 (comment)

@HMaker
Copy link

HMaker commented Aug 27, 2023

I have the same issue. Tried to wrap with asyncio.wait_for() a db query decorated by sync_to_async(). It hang out, but in some cases the query ran on a new thread sensitive context. The initial context, created by the django's ASGIHandler, was "bypassed". I expect all thread sensitive sync_to_async() code to be run in the same thread of the active context.

I think the sensitive context is not being properly inherited by tasks created by asyncio.create_task().

@ttys0dev
Copy link
Contributor

I'm thinking the issue demonstrated in #432 may be related to this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

6 participants