Skip to content

Commit

Permalink
daemon: Gather up and exit on unexpected errors
Browse files Browse the repository at this point in the history
Our running of workers, job trackers and the cluster duplicate handler
as asyncio tasks would show any unexpected error conditions only on
shutdown. We make all startup routines that create asyncio tasks return
those tasks so we can build a list of awaitables from which to gather
any error conditions. A single call to asyncio.gather with these
awaitables now becomes the central serialisation point in the daemon
instead of PeekabooServer.serve().

The latter doesn't even make it into the list of awaitables since
asyncio servers in general and Sanic servers in particular run fully
enclosed in the event loop and do not expose any means to gather their
error conditions. Here we need to continue to rely on Sanic's built-in
error handling and logging (for now).

This restructuring now also allows us to fully start up the server
before reporting readiness to systemd.

Logging is restructured and extended to make every component report
startup and shutdown in a similar fashion.
  • Loading branch information
michaelweiser committed Apr 4, 2022
1 parent 4001e27 commit 3ec2736
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 27 deletions.
31 changes: 24 additions & 7 deletions peekaboo/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ async def async_main():
threadpool = concurrent.futures.ThreadPoolExecutor(
config.worker_count, 'ThreadPool-')

# collect a list of awaitables from started subsystems from which to gather
# unexpected error conditions such as exceptions
awaitables = []

# read in the analyzer and ruleset configuration and start the job queue
try:
ruleset_config = PeekabooConfigParser(config.ruleset_config)
Expand All @@ -339,7 +343,7 @@ async def async_main():
cluster_duplicate_check_interval=cldup_check_interval,
threadpool=threadpool)
sig_handler.register_listener(job_queue)
await job_queue.start()
awaitables.extend(await job_queue.start())
except PeekabooConfigException as error:
logging.critical(error)
sys.exit(1)
Expand All @@ -357,6 +361,10 @@ async def async_main():
sample_factory=sample_factory,
request_queue_size=100,
db_con=db_con)
sig_handler.register_listener(server)
# the server runs completely inside the event loop and does not expose
# any awaitable to extract exceptions from.
await server.start()
except Exception as error:
logger.critical('Failed to start Peekaboo Server: %s', error)
job_queue.shut_down()
Expand All @@ -367,16 +375,25 @@ async def async_main():
if sig_handler.shutdown_requested:
sys.exit(0)

sig_handler.register_listener(server)
SystemdNotifier().notify("READY=1")
await server.serve()

# trigger shutdowns of other components (if not already ongoing triggered
# by e.g. the signal handler), server will already be shut down at this
# point signaled by the fact that serve() above returned
job_queue.shut_down()
try:
await asyncio.gather(*awaitables)
# CancelledError is derived from BaseException, not Exception
except asyncio.exceptions.CancelledError as error:
# cancellation is expected in the case of shutdown via signal handler
pass
except Exception:
logger.error("Shutting down due to unexpected exception")

# trigger shutdowns of other components if not already ongoing triggered
# by the signal handler
if not sig_handler.shutdown_requested:
server.shut_down()
job_queue.shut_down()

# close down components after they've shut down
await server.close_down()
await job_queue.close_down()

# do a final cleanup pass through the database
Expand Down
34 changes: 22 additions & 12 deletions peekaboo/queuing.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,19 @@ def __init__(self, ruleset_config, db_con, analyzer_config,

async def start(self):
""" Start up the job queue including resource initialisation. """
awaitables = []
for worker in self.workers:
await worker.start()
awaitables.append(await worker.start())

if self.cluster_duplicate_handler:
await self.cluster_duplicate_handler.start()
awaitables.append(await self.cluster_duplicate_handler.start())

# create a single ruleset engine for all workers, instantiates all the
# rules based on the ruleset configuration, may start up long-lived
# analyzer instances which are shared as well, is otherwise stateless
# to allow concurrent use by multiple worker
try:
await self.ruleset_engine.start()
awaitables.extend(await self.ruleset_engine.start())
except (KeyError, ValueError, PeekabooConfigException) as error:
self.shut_down()
await self.close_down()
Expand All @@ -124,6 +125,8 @@ async def start(self):
await self.close_down()
raise PeekabooConfigException(error)

return awaitables

async def submit(self, sample):
"""
Adds a Sample object to the job queue.
Expand Down Expand Up @@ -326,8 +329,6 @@ def shut_down(self):

async def close_down(self):
""" Wait for workers to stop and free up resources. """
logger.info("Closing down.")

for worker in self.workers:
await worker.close_down()

Expand All @@ -337,6 +338,9 @@ async def close_down(self):
if self.ruleset_engine is not None:
await self.ruleset_engine.close_down()

logger.info("Queue shut down.")


class ClusterDuplicateHandler:
""" A housekeeper handling submission and cleanup of cluster duplicates.
"""
Expand All @@ -349,6 +353,7 @@ def __init__(self, job_queue, interval=5):
async def start(self):
self.task = asyncio.ensure_future(self.run())
self.task.set_name(self.task_name)
return self.task

async def run(self):
logger.debug("Cluster duplicate handler started.")
Expand All @@ -359,14 +364,9 @@ async def run(self):
logger.debug("Checking for samples in processing by other "
"instances to submit")

# TODO: Error handling: How do we cause Peekaboo to exit with an
# error from here? For now just keep trying and hope (database)
# failure is transient.
await self.job_queue.clear_stale_in_flight_samples()
await self.job_queue.submit_cluster_duplicates()

logger.debug("Cluster duplicate handler shut down.")

def shut_down(self):
""" Asynchronously initiate cluster duplicate handler shutdown. """
logger.debug("Cluster duplicate handler shutdown requested.")
Expand All @@ -382,6 +382,11 @@ async def close_down(self):
# we cancelled the task so a CancelledError is expected
except asyncio.CancelledError:
pass
except Exception:
logger.exception(
"Unexpected exception in cluster duplicate handler")

logger.debug("Cluster duplicate handler shut down.")


class Worker:
Expand All @@ -398,6 +403,7 @@ def __init__(self, wid, job_queue, ruleset_engine, db_con):
async def start(self):
self.task = asyncio.ensure_future(self.run())
self.task.set_name(self.worker_name)
return self.task

async def run(self):
while True:
Expand Down Expand Up @@ -438,10 +444,9 @@ async def run(self):

await self.job_queue.done(sample)

logger.info('Worker %d: Stopped', self.worker_id)

def shut_down(self):
""" Asynchronously initiate worker shutdown. """
logger.info("Worker %d: shutdown requested.", self.worker_id)
if self.task is not None:
self.task.cancel()

Expand All @@ -454,3 +459,8 @@ async def close_down(self):
# we cancelled the task so a CancelledError is expected
except asyncio.CancelledError:
pass
except Exception:
logger.exception(
"Unexpected exception in worker %d", self.worker_id)

logger.info('Worker %d: Stopped', self.worker_id)
16 changes: 14 additions & 2 deletions peekaboo/ruleset/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ async def start(self):
# instantiate enabled rules and have them check their configuration,
# user-defined rule order is preserved in enabled_rules and through
# ordered append() in self.rules
awaitables = []
for rule_name in enabled_rules:
rule = rule_classes[rule_name](
self.config, self.db_con, self.threadpool)
Expand All @@ -146,10 +147,13 @@ async def start(self):
self.analyzer_config.cuckoo_submit_original_filename,
self.analyzer_config.cuckoo_maximum_job_age)

if not await self.cuckoo.start_tracker():
awaitable = await self.cuckoo.start_tracker()
if not awaitable:
raise PeekabooRulesetConfigError(
"Failure to initialize Cuckoo job tracker")

awaitables.append(awaitable)

rule.set_cuckoo_job_tracker(self.cuckoo)

if rule.uses_cortex:
Expand All @@ -166,10 +170,13 @@ async def start(self):
self.analyzer_config.cortex_submit_original_filename,
self.analyzer_config.cortex_maximum_job_age)

if not await self.cortex.start_tracker():
awaitable = await self.cortex.start_tracker()
if not awaitable:
raise PeekabooRulesetConfigError(
"Failure to initialize Cortex job tracker")

awaitables.append(awaitable)

rule.set_cortex_job_tracker(self.cortex)

self.rules.append(rule)
Expand All @@ -184,6 +191,8 @@ async def start(self):
if self.shutdown_requested:
self.shut_down_resources()

return awaitables

async def run(self, sample):
""" Run all the rules in the ruleset against a given sample
Expand Down Expand Up @@ -228,6 +237,7 @@ def shut_down_resources(self):
def shut_down(self):
""" Initiate asynchronous shutdown of the ruleset engine and dependent
logic such as job trackers. """
logger.debug("Ruleset engine shutdown requested.")
self.shutdown_requested = True
self.shut_down_resources()

Expand All @@ -238,3 +248,5 @@ async def close_down(self):

if self.cortex is not None:
await self.cortex.close_down()

logger.debug("Ruleset engine shut down.")
13 changes: 9 additions & 4 deletions peekaboo/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,8 @@ async def report(self, _, job_id):
# 'report': report,
}, 200)

async def serve(self):
""" Serves requests until shutdown is requested from the outside. """
async def start(self):
""" Makes the server start serving requests. """
self.server = await self.server_coroutine

# sanic 21.9 introduced an explicit startup that finalizes the app,
Expand All @@ -281,12 +281,17 @@ async def serve(self):
await self.server.start_serving()
logger.info('Peekaboo server is now listening on %s:%d',
self.host, self.port)
await self.server.wait_closed()
logger.debug('Server shut down.')

def shut_down(self):
""" Triggers a shutdown of the server, used by the signal handler and
potentially other components to cause the main loop to exit. """
logger.debug('Server shutdown requested.')
if self.server is not None:
self.server.close()

async def close_down(self):
""" Wait for the server to close down. """
if self.server is not None:
await self.server.wait_closed()

logger.debug('Server shut down.')
6 changes: 5 additions & 1 deletion peekaboo/toolbox/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,7 @@ async def start_tracker(self):
""" Start tracking running jobs in a separate thread. """
self.tracker = asyncio.ensure_future(self.track())
self.tracker.set_name("CortexJobTracker")
return True
return self.tracker

async def track(self):
""" Do the polling for finished jobs. """
Expand Down Expand Up @@ -858,5 +858,9 @@ async def close_down(self):
# we cancelled the task so a CancelledError is expected
except asyncio.CancelledError:
pass
except Exception:
logger.exception(
"Unexpected exception in Cortex job tracker")

await self.session.close()
logger.debug("Cortex job tracker shut down.")
6 changes: 5 additions & 1 deletion peekaboo/toolbox/cuckoo.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ async def start_tracker(self):
""" Start tracking running jobs in a separate thread. """
self.tracker = asyncio.ensure_future(self.track())
self.tracker.set_name("CuckooJobTracker")
return True
return self.tracker

async def track(self):
""" Do the polling for finished jobs. """
Expand Down Expand Up @@ -401,8 +401,12 @@ async def close_down(self):
# we cancelled the task so a CancelledError is expected
except asyncio.CancelledError:
pass
except Exception:
logger.exception(
"Unexpected exception in Cuckoo job tracker")

await self.session.close()
logger.debug("Cuckoo job tracker shut down.")


class CuckooReport:
Expand Down

0 comments on commit 3ec2736

Please sign in to comment.