-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Asyncio #210
Merged
Merged
Asyncio #210
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
michaelweiser
force-pushed
the
asyncio
branch
2 times, most recently
from
April 6, 2022 09:42
3ec2736
to
693cbfe
Compare
Currently Peekaboo uses classic threading for the work queue and toolbox modules, particularly Cuckoo and Cortex job trackers. The server now uses asyncio. This combination throws up problems such as signal handling needing to be done by the event loop to work reliably but the event loop getting blocked by toolbox module startup not being async. Instead of working around these symptoms we fully switch to asyncio: - Threads can become asyncio Tasks. - Shutdown threading.Events can be replaced by simply cancelling the tasks. - Locking mostly goes away. The Queue still needs to lock accesses to the duplicates logs at least where asyncio awaitables are awaited and may yield from the critical section modifying the log. - sqlite (still serialised through a 1-connection QueuePool) gives no deadlocks/database is locked errors any more. This state works but has a number of issues that are to be solved in succeeding commits: - Because of the elimination of threads we no longer scale beyond one CPU core. - requests still needs replacing by aiohttp to become non-blocking. - We only get to see any exceptions thrown in tasks created by asyncio.ensure_future() when retrieving their result with await on close_down(), i.e. on shutdown. Handing all tasks to a single asyncio.gather() in the daemon might remedy that.
This change restores multiprocessing and scaling beyond a single CPU core by using a concurrent.futures.ThreadPoolExecutor. This allows running blocking code in a separate thread while internally using a queue to implement a pool - basically what our Queue was doing before asyncio. We add this to the sample for calculating the sha256sum and dumping processing info and the ruleset where we use it to delegate oletools and filetools to executors.
The Cuckoo toolbox module uses requests which does not interact well with the asyncio event loop. These changes make the Cuckoo module use aiohttp instead of requests. For retries we switch to tenacity.
The Cortex toolbox module used to use cortex4py and was recently converted to requests. Both do not interact well with the asyncio event loop. Therefore we switch to aiohttp instead of requests. Retries are implemented using tenacity instead of a requests Retry policy.
Python 3.6 does not have asyncio.get_running_loop(). At the time, calling asyncio.get_event_loop() was the only way to get at the running event loop. Unfortunately this has the side-effect of silently starting a new loop if there is none. We want to know if that were to happen, either through an exception or follow-up errors. So we intentionally use asyncio.get_running_loop. Now we monkey-patch it into asyncio if not present based on the code of Python 3.8.
Adjust the testsuite (again) to the asyncio changes.
Our running of workers, job trackers and the cluster duplicate handler as asyncio tasks shows any unexpected error conditions by way of exceptions only on shutdown. This was also the case with classic threading and threads (and now tasks) would die silently, stop functioning and even prevent regular shutdown. To finally remedy this situation we make all startup routines that create asyncio tasks return those tasks so we can build a list of awaitables from which to gather any error conditions immediately when they happen. A single call to asyncio.gather with these awaitables now becomes the central serialisation point in the daemon instead of PeekabooServer.serve(). The latter doesn't even make it into the list of awaitables since asyncio servers in general and Sanic servers in particular run fully enclosed in the event loop and do not expose any means to gather their error conditions. Here we need to continue to rely on Sanic's built-in error handling and logging (for now). This restructuring now also allows us to fully start up the server before reporting readiness to systemd. Logging is restructured and extended to make every component report startup and shutdown in a similar fashion.
michaelweiser
force-pushed
the
asyncio
branch
2 times, most recently
from
May 25, 2022 12:50
4189e79
to
51ccbd4
Compare
Jack28
approved these changes
Jun 1, 2022
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
These changes switch the rest of Peekaboo beyond the sanic server to asyncio to avoid the symptoms stemming from interaction of threading and asyncio we've seen (e.g. #205). Multiprocessing is maintained through threadpool.
This is meant for discussion and inclusion early in the 2.2 cycle to give us time for testing and improvements.