Skip to content

Commit

Permalink
Task status v2 (#1558)
Browse files Browse the repository at this point in the history
* Various updates to task management

* Updates

* Updates

* Updates to task manager

* Update unit test

* Minor updates to use celery states
  • Loading branch information
jleaniz authored Oct 1, 2024
1 parent 37889ef commit c290576
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 74 deletions.
10 changes: 5 additions & 5 deletions turbinia/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import json
import logging
import sys
from celery import states as celery_states
from datetime import datetime
from datetime import timedelta
from typing import Any, List, Dict, Optional
Expand Down Expand Up @@ -269,7 +270,6 @@ def update_request_task(self, task) -> None:
task (TurbiniaTask): Turbinia task object.
"""
request_key = self.redis_client.build_key_name('request', task.request_id)
task_key = self.redis_client.build_key_name('task', task.id)
try:
self.redis_client.add_to_list(request_key, 'task_ids', task.id)
request_last_update = datetime.strptime(
Expand All @@ -291,12 +291,12 @@ def update_request_task(self, task) -> None:
elif task.result.successful is False:
self.redis_client.add_to_list(request_key, 'failed_tasks', task.id)
statuses_to_remove.remove('failed_tasks')
task_status = self.redis_client.get_attribute(task_key, 'status')
if task_status and 'Task is running' in task_status:
task_status = task.celery_state
if task_status == celery_states.STARTED:
self.redis_client.add_to_list(request_key, 'running_tasks', task.id)
statuses_to_remove.remove('running_tasks')
elif (task_status is None or task_status == 'queued' or
task_status == 'pending'):
elif (task_status is None or task_status == celery_states.RECEIVED or
task_status == celery_states.PENDING):
self.redis_client.add_to_list(request_key, 'queued_tasks', task.id)
statuses_to_remove.remove('queued_tasks')
for status_name in statuses_to_remove:
Expand Down
32 changes: 24 additions & 8 deletions turbinia/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,14 @@ def _backend_setup(self, *args, **kwargs):
self.celery_runner = self.celery.app.task(
task_utils.task_runner, name='task_runner')

def _get_worker_name(self, celery_stub):
"""Gets the Celery worker name from the AsyncResult object."""
worker_name = celery_stub.result.get('hostname', None)
if worker_name:
# example hostname: celery@turbinia-hostname
worker_name = worker_name.split('@')[1]
return worker_name

def process_tasks(self):
"""Determine the current state of our tasks.
Expand All @@ -700,28 +708,33 @@ def process_tasks(self):
celery_task = task.stub
# ref: https://docs.celeryq.dev/en/stable/reference/celery.states.html
if not celery_task:
log.debug(f'Task {task.stub.task_id:s} not yet created.')
log.info(f'Task {task.stub.task_id:s} not yet created.')
check_timeout = True
elif celery_task.status == celery_states.STARTED:
# Task status will be set to running when the worker executes run_wrapper()
log.debug(f'Task {celery_task.id:s} not finished.')
log.warning(f'Task {celery_task.id:s} {task.id} started.')
task.worker_name = self._get_worker_name(celery_task)
task.celery_state = celery_states.STARTED
check_timeout = True
elif celery_task.status == celery_states.FAILURE:
log.warning(f'Task {celery_task.id:s} failed.')
log.info(f'Task {celery_task.id:s} failed.')
task.celery_state = celery_states.FAILURE
self.close_failed_task(task)
completed_tasks.append(task)
elif celery_task.status == celery_states.SUCCESS:
task.celery_state = celery_states.SUCCESS
task.result = workers.TurbiniaTaskResult.deserialize(celery_task.result)
completed_tasks.append(task)
elif celery_task.status == celery_states.PENDING:
task.status = 'pending'
log.info(f'Task {celery_task.id:s} is pending.')
task.celery_state = celery_states.PENDING
check_timeout = True
log.debug(f'Task {celery_task.id:s} is pending.')
elif celery_task.status == celery_states.RECEIVED:
task.status = 'queued'
log.info(f'Task {celery_task.id:s} is queued.')
task.worker_name = self._get_worker_name(celery_task)
task.celery_state = celery_states.RECEIVED
check_timeout = True
log.debug(f'Task {celery_task.id:s} is queued.')
elif celery_task.status == celery_states.REVOKED:
task.celery_state = celery_states.REVOKED
message = (
f'Celery task {celery_task.id:s} associated with Turbinia '
f'task {task.id} was revoked. This could be caused if the task is '
Expand All @@ -746,6 +759,9 @@ def process_tasks(self):
task = self.timeout_task(task, timeout)
completed_tasks.append(task)

# Update task metadata so we have an up to date state.
self.state_manager.update_task(task)

outstanding_task_count = len(self.tasks) - len(completed_tasks)
if outstanding_task_count > 0:
log.info(f'{outstanding_task_count:d} Tasks still outstanding.')
Expand Down
1 change: 1 addition & 0 deletions turbinia/tcelery.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def setup(self):
task_default_queue=config.INSTANCE_ID,
# Re-queue task if Celery worker abruptly exists
task_reject_on_worker_lost=True,
task_track_started=True,
worker_cancel_long_running_tasks_on_connection_loss=True,
worker_concurrency=1,
worker_prefetch_multiplier=1,
Expand Down
47 changes: 8 additions & 39 deletions turbinia/workers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ class TurbiniaTaskResult:

# The list of attributes that we will persist into storage
STORED_ATTRIBUTES = [
'worker_name', 'report_data', 'report_priority', 'run_time', 'status',
'saved_paths', 'successful', 'evidence_size'
'report_data', 'report_priority', 'run_time', 'status', 'saved_paths',
'successful', 'evidence_size', 'worker_name'
]

def __init__(
Expand Down Expand Up @@ -454,7 +454,7 @@ class TurbiniaTask:
STORED_ATTRIBUTES = [
'id', 'job_id', 'job_name', 'start_time', 'last_update', 'name',
'evidence_name', 'evidence_id', 'request_id', 'requester', 'group_name',
'reason', 'group_id', 'celery_id'
'reason', 'group_id', 'celery_id', 'celery_state', 'worker_name'
]

# The list of evidence states that are required by a Task in order to run.
Expand Down Expand Up @@ -483,6 +483,7 @@ def __init__(

self.id = uuid.uuid4().hex
self.celery_id = None
self.celery_state = None
self.is_finalize_task = False
self.job_id = None
self.job_name = None
Expand All @@ -508,6 +509,7 @@ def __init__(
self.group_name = group_name
self.reason = reason
self.group_id = group_id
self.worker_name = None

def serialize(self):
"""Converts the TurbiniaTask object into a serializable dict.
Expand Down Expand Up @@ -821,6 +823,7 @@ def setup(self, evidence):
TurbiniaException: If the evidence can not be found.
"""
self.setup_metrics()
self.state_manager = state_manager.get_state_manager()
self.output_manager.setup(self.name, self.id, self.request_id)
self.tmp_dir, self.output_dir = self.output_manager.get_local_output_dirs()
if not self.result:
Expand Down Expand Up @@ -1039,21 +1042,10 @@ def run_wrapper(self, evidence):

log.debug(f'Task {self.name:s} {self.id:s} awaiting execution')
failure_message = None
worker_name = platform.node()
self.worker_name = platform.node()
try:
evidence = evidence_decode(evidence)
self.result = self.setup(evidence)
# Call update_task_status to update status
# We cannot call update_task() here since it will clobber previously
# stored data by the Turbinia server when the task was created, which is
# not present in the TurbiniaTask object the worker currently has in its
# runtime.
self.update_task_status(self, 'queued')
# Beucase of the same reason, we perform a single attribute update
# for the worker name.
task_key = self.state_manager.redis_client.build_key_name('task', self.id)
self.state_manager.redis_client.set_attribute(
task_key, 'worker_name', json.dumps(worker_name))
turbinia_worker_tasks_queued_total.inc()
task_runtime_metrics = self.get_metrics()
except TurbiniaException as exception:
Expand Down Expand Up @@ -1108,9 +1100,6 @@ def run_wrapper(self, evidence):
self._evidence_config = evidence.config
self.task_config = self.get_task_recipe(evidence.config)
self.worker_start_time = datetime.now()
# Update task status so we know which worker the task executed on.
updated_status = f'Task is running on {worker_name}'
self.update_task_status(self, updated_status)
self.result = self.run(evidence, self.result)

# pylint: disable=broad-except
Expand Down Expand Up @@ -1161,7 +1150,7 @@ def run_wrapper(self, evidence):
failure_message = (
'Task Result was auto-closed from task executor on {0:s} likely '
'due to previous failures. Previous status: [{1:s}]'.format(
self.result.worker_name, status))
self.worker_name, status))
self.result.log(failure_message)
try:
self.result.close(self, False, failure_message)
Expand Down Expand Up @@ -1197,23 +1186,3 @@ def run(self, evidence, result):
TurbiniaTaskResult object.
"""
raise NotImplementedError

def update_task_status(self, task, status=None):
"""Updates the task status and pushes it directly to datastore.
Args:
task (TurbiniaTask): The calling Task object
status (str): Brief word or phrase for Task state. If not supplied, the
existing Task status will be used.
"""
if not status:
return
if not self.state_manager:
self.state_manager = state_manager.get_state_manager()
if self.state_manager:
task_key = self.state_manager.redis_client.build_key_name('task', task.id)
self.state_manager.redis_client.set_attribute(
task_key, 'status', json.dumps(status))
self.state_manager.update_request_task(task)
else:
log.info('No state_manager initialized, not updating Task info')
26 changes: 12 additions & 14 deletions turbinia/workers/workers_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ def setResults(

self.result.input_evidence = evidence.RawDisk()
self.result.status = 'TestStatus'
self.update_task_status = mock.MagicMock()
self.result.close = mock.MagicMock()
self.task.setup = mock.MagicMock(return_value=setup)
self.result.worker_name = 'worker1'
Expand Down Expand Up @@ -147,8 +146,10 @@ def testTurbiniaTaskSerialize(self):
self.assertEqual(out_obj.__dict__, self.plaso_task.__dict__)

@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
@mock.patch('turbinia.workers.TurbiniaTask.update_task_status')
def testTurbiniaTaskRunWrapper(self, _, __):
def testTurbiniaTaskRunWrapper(
self,
_,
):
"""Test that the run wrapper executes task run."""
self.setResults()
self.result.closed = True
Expand All @@ -159,8 +160,10 @@ def testTurbiniaTaskRunWrapper(self, _, __):
self.result.close.assert_not_called()

@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
@mock.patch('turbinia.workers.TurbiniaTask.update_task_status')
def testTurbiniaTaskRunWrapperAutoClose(self, _, __):
def testTurbiniaTaskRunWrapperAutoClose(
self,
_,
):
"""Test that the run wrapper closes the task."""
self.setResults()
new_result = self.task.run_wrapper(self.evidence.__dict__)
Expand All @@ -170,8 +173,7 @@ def testTurbiniaTaskRunWrapperAutoClose(self, _, __):

@mock.patch('turbinia.state_manager.get_state_manager')
@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
@mock.patch('turbinia.workers.TurbiniaTask.update_task_status')
def testTurbiniaTaskRunWrapperBadResult(self, _, __, ___):
def testTurbiniaTaskRunWrapperBadResult(self, _, __):
"""Test that the run wrapper recovers from run returning bad result."""
bad_result = 'Not a TurbiniaTaskResult'
checked_result = TurbiniaTaskResult(base_output_dir=self.base_output_dir)
Expand All @@ -185,8 +187,7 @@ def testTurbiniaTaskRunWrapperBadResult(self, _, __, ___):
self.assertIn('CheckedResult', new_result.status)

@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
@mock.patch('turbinia.workers.TurbiniaTask.update_task_status')
def testTurbiniaTaskJobUnavailable(self, _, __):
def testTurbiniaTaskJobUnavailable(self, _):
"""Test that the run wrapper can fail if the job doesn't exist."""
self.setResults()
self.task.job_name = 'non_exist'
Expand All @@ -198,8 +199,7 @@ def testTurbiniaTaskJobUnavailable(self, _, __):
self.assertEqual(new_result.status, canary_status)

@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
@mock.patch('turbinia.workers.TurbiniaTask.update_task_status')
def testTurbiniaTaskRunWrapperExceptionThrown(self, _, __):
def testTurbiniaTaskRunWrapperExceptionThrown(self, _):
"""Test that the run wrapper recovers from run throwing an exception."""
self.setResults()
self.task.run = mock.MagicMock(side_effect=TurbiniaException)
Expand Down Expand Up @@ -253,9 +253,7 @@ def testTurbiniaTaskValidateResultBadResult(self, _, __):

@mock.patch('turbinia.workers.evidence_decode')
@mock.patch('turbinia.redis_client.RedisClient.set_attribute')
@mock.patch('turbinia.workers.TurbiniaTask.update_task_status')
def testTurbiniaTaskEvidenceValidationFailure(
self, _, __, evidence_decode_mock):
def testTurbiniaTaskEvidenceValidationFailure(self, _, evidence_decode_mock):
"""Tests Task fails when evidence validation fails."""
self.setResults()
test_evidence = evidence.RawDisk()
Expand Down
22 changes: 17 additions & 5 deletions web/src/components/TaskDetails.vue
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,17 @@ limitations under the License.
<v-alert v-if="taskDetails.successful === true" type="success" prominent>
{{ taskDetails.status }}
</v-alert>
<v-alert v-else-if="taskDetails.successful === null" type="info" prominent>
<div v-if="taskDetails.status">
{{ taskDetails.status }}
</div>
<div v-else>Task {{ taskDetails.id }} is pending</div>
<v-alert v-else-if="taskDetails.successful === false" type="error" prominent>
{{ taskDetails.status }}
</v-alert>
<v-alert v-else-if="taskDetails.celery_state === 'STARTED'" type="info" prominent>
Task {{ taskDetails.id }} is running on {{ taskDetails.worker_name }}
</v-alert>
<v-alert v-else-if="taskDetails.celery_state === 'PENDING'" type="info" prominent>
Task {{ taskDetails.id }} is pending.
</v-alert>
<v-alert v-else-if="taskDetails.celery_state === 'RECEIVED'" type="info" prominent>
Task {{ taskDetails.id }} is queued.
</v-alert>
<v-alert v-else type="error" prominent>
{{ taskDetails.status }}
Expand Down Expand Up @@ -72,6 +78,12 @@ limitations under the License.
</div>
<div v-else>N/A</div>
</v-list-item>
<v-list-item title="Celery State:">
<div v-if="taskDetails.celery_state">
{{ taskDetails.celery_state }}
</div>
<div v-else>N/A</div>
</v-list-item>
<v-list-item title="Evidence ID:">
<div v-if="taskDetails.evidence_id">
{{ taskDetails.evidence_id }}
Expand Down
12 changes: 9 additions & 3 deletions web/src/components/TaskList.vue
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,20 @@ export default {
let data = response.data['tasks']
for (const task in data) {
let task_dict = data[task]
let taskStatusTemp = task_dict.status
let taskStatusTemp = task_dict.celery_state
// As pending status requests show as null or pending
if (taskStatusTemp === null || taskStatusTemp === "pending") {
if (taskStatusTemp === null || taskStatusTemp === "PENDING") {
taskStatusTemp = 'is pending on server.'
}
else if (taskStatusTemp == "queued") {
else if (taskStatusTemp == "RECEIVED") {
taskStatusTemp = 'is queued for execution.'
}
else if (taskStatusTemp == "STARTED") {
taskStatusTemp = 'is running on ' + task_dict.worker_name
}
else {
taskStatusTemp = task_dict.status
}
if (this.filterJobs.length > 0) {
let jobName = task_dict.job_name.toLowerCase()
if (this.radioFilter && !this.filterJobs.includes(jobName)) {
Expand Down

0 comments on commit c290576

Please sign in to comment.