Skip to content

Commit

Permalink
Keep task results if available and update logs/counter (#1559)
Browse files Browse the repository at this point in the history
  • Loading branch information
aarontp authored Oct 1, 2024
1 parent c290576 commit 252410c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 28 deletions.
30 changes: 16 additions & 14 deletions turbinia/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ def get_task(self, task_id: str) -> dict:
task_id (str): The ID of the stored task.
Returns:
task_dict (dict): Dict containing task attributes.
task_dict (dict): Dict containing task attributes.
"""
task_key = ':'.join(('TurbiniaTask', task_id))
task_dict = {}
Expand Down Expand Up @@ -265,7 +265,7 @@ def get_task_data(

def update_request_task(self, task) -> None:
"""Adds a Turbinia task to the corresponding request list.
Args:
task (TurbiniaTask): Turbinia task object.
"""
Expand Down Expand Up @@ -315,7 +315,9 @@ def write_new_task(self, task) -> Optional[str]:
Returns:
task_key Optional[str]: The key corresponding for the task.
"""
log.info(f'Writing metadata for new task {task.name:s} with id {task.id:s}')
log.info(
f'Writing metadata for new task {task.name:s} with id {task.id:s} '
f'and request ID {task.request_id}')
try:
task_key = self.redis_client.build_key_name('task', task.id)
except ValueError as exception:
Expand Down Expand Up @@ -359,7 +361,7 @@ def update_task_helper(self, task) -> Dict[str, Any]:

def update_task(self, task) -> Optional[str]:
"""Updates a Turbinia task key.
Args:
task: A TurbiniaTask object.
Expand Down Expand Up @@ -400,7 +402,7 @@ def write_evidence(self, evidence_dict: dict[str]) -> str:
Returns:
evidence_key (str): The key corresponding to the evidence in Redis
Raises:
TurbiniaException: If the attribute deserialization fails.
"""
Expand Down Expand Up @@ -443,7 +445,7 @@ def get_evidence_data(self, evidence_id: str) -> dict:
evidence_id (str): The ID of the stored evidence.
Returns:
evidence_dict (dict): Dict containing evidence attributes.
evidence_dict (dict): Dict containing evidence attributes.
"""
evidence_key = ':'.join(('TurbiniaEvidence', evidence_id))
evidence_dict = {}
Expand All @@ -462,7 +464,7 @@ def get_evidence_summary(
output (str): Output of the function (keys | content | count).
Returns:
summary (dict | list | int): Object containing evidences.
summary (dict | list | int): Object containing evidences.
"""
if output == 'count' and not group:
return sum(1 for _ in self.redis_client.iterate_keys('Evidence'))
Expand Down Expand Up @@ -494,7 +496,7 @@ def query_evidence(
output (str): Output of the function (keys | content | count).
Returns:
query_result (list | int): Result of the query.
query_result (list | int): Result of the query.
"""
keys = []
for evidence_key in self.redis_client.iterate_keys('Evidence'):
Expand All @@ -517,7 +519,7 @@ def get_evidence_key_by_hash(self, file_hash: str) -> str | None:
file_hash (str): The hash of the stored evidence.
Returns:
key (str | None): Key of the stored evidence.
key (str | None): Key of the stored evidence.
"""
try:
if file_hash:
Expand All @@ -533,7 +535,7 @@ def get_evidence_by_hash(self, file_hash: str) -> dict:
file_hash (str): The hash of the stored evidence.
Returns:
evidence_dict (dict): Dict containing evidence attributes.
evidence_dict (dict): Dict containing evidence attributes.
"""
evidence_id = self.get_evidence_key_by_hash(file_hash).split(':')[1]
return self.get_evidence_data(evidence_id)
Expand All @@ -544,12 +546,12 @@ def write_request(self, request_dict: dict, overwrite=False):
Args:
request_dict (dict[str]): A dictionary containing the serialized
request attributes that will be saved.
overwrite (bool): Allows overwriting previous key and blocks writing new
overwrite (bool): Allows overwriting previous key and blocks writing new
ones.
Returns:
request_key (str): The key corresponding to the evidence in Redis
Raises:
TurbiniaException: If the attribute deserialization fails or tried to
overwrite an existing key without overwrite=True
Expand Down Expand Up @@ -589,7 +591,7 @@ def get_request_data(self, request_id: str) -> dict:
request_id (str): The ID of the stored request.
Returns:
request_dict (dict): Dict containing request attributes.
request_dict (dict): Dict containing request attributes.
"""
request_key = self.redis_client.build_key_name('request', request_id)
request_dict = {}
Expand Down Expand Up @@ -637,7 +639,7 @@ def query_requests(
output (str): Output of the function (keys | content | count).
Returns:
query_result (list | int): Result of the query.
query_result (list | int): Result of the query.
"""
keys = []
for request_key in self.redis_client.iterate_keys('request'):
Expand Down
44 changes: 30 additions & 14 deletions turbinia/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@
turbinia_server_task_timeout_total = Counter(
'turbinia_server_task_timeout_total',
'Total number of Tasks that have timed out on the Server.')
turbinia_server_task_fail_total = Counter(
'turbinia_server_task_fail_total',
'Total number of Tasks that have failed according to the Server.')
turbinia_result_success_invalid = Counter(
'turbinia_result_success_invalid',
'The result returned from the Task had an invalid success status of None')
Expand Down Expand Up @@ -623,18 +626,30 @@ def close_failed_task(self, task):
Returns:
TurbiniaTask: The updated Task.
"""
result = workers.TurbiniaTaskResult(
request_id=task.request_id, no_output_manager=True,
no_state_manager=True)
result.setup(task)
if task.stub.traceback:
result.status = (
f'Task {task.id} failed with exception: {task.stub.traceback}')
else:
result.status = f'Task {task.id} failed.'
result.successful = False
result.closed = True
task.result = result
if not task.result:
result = workers.TurbiniaTaskResult(
request_id=task.request_id, no_output_manager=True,
no_state_manager=True)
result.setup(task)
task.result = result

if not task.result.status:
if task.stub.traceback:
task.result.status = (
f'Task {task.id} failed with exception: {task.stub.traceback}')
else:
task.result.status = f'Task {task.id} failed.'
# Record the traceback in the status if we have one otherwise it won't
# be visible for the user.
elif task.result.status and task.stub.traceback:
new_status = (
f'{task.result.status} (Task failure exception: '
f'{task.stub.traceback})')
task.result.status = new_status

task.result.successful = False
task.result.closed = True
turbinia_server_task_fail_total.inc()
return task

def timeout_task(self, task, timeout):
Expand Down Expand Up @@ -813,8 +828,9 @@ def get_evidence(self):

def enqueue_task(self, task, evidence_, timeout):
log.info(
f'Adding Celery task {task.name:s} with evidence {evidence_.name:s}'
f' to queue with base task timeout {timeout}')
f'Adding Celery task {task.name:s} for request id {task.request_id} '
f'with evidence {evidence_.name:s} to queue with base task '
f'timeout {timeout}')
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#task-time-limit
# Hard limit in seconds, the worker processing the task will be killed and
# replaced with a new one when this is exceeded.
Expand Down

0 comments on commit 252410c

Please sign in to comment.