Skip to content

Commit

Permalink
Various updates to task management
Browse files Browse the repository at this point in the history
  • Loading branch information
jleaniz committed Sep 17, 2024
1 parent 23eb6bd commit 4c4cbc0
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 32 deletions.
1 change: 0 additions & 1 deletion turbinia/api/cli/turbinia_client/core/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,3 @@ def submit_group(ctx: click.Context):
of available evidence types.
"""
ctx.invoke(setup_submit)
click.echo(submit_group.get_help(ctx))
4 changes: 2 additions & 2 deletions turbinia/evidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def preprocess(self, task_id, tmp_dir=None, required_states=None):
if self.resource_tracked:
# Track resource and task id in state file
log.debug(
'Evidence {0:s} is resource tracked. Acquiring filelock for'
'Evidence {0:s} is resource tracked. Acquiring filelock for '
'preprocessing'.format(self.name))
with filelock.FileLock(config.RESOURCE_FILE_LOCK):
resource_manager.PreprocessResourceState(self.resource_id, task_id)
Expand Down Expand Up @@ -565,7 +565,7 @@ def postprocess(self, task_id):

if self.resource_tracked:
log.debug(
'Evidence: {0:s} is resource tracked. Acquiring filelock for '
'Evidence {0:s} is resource tracked. Acquiring filelock for '
'postprocessing.'.format(self.name))
with filelock.FileLock(config.RESOURCE_FILE_LOCK):
# Run postprocess to either remove task_id or resource_id.
Expand Down
5 changes: 3 additions & 2 deletions turbinia/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,11 @@ def update_request_task(self, task) -> None:
self.redis_client.add_to_list(request_key, 'failed_tasks', task.id)
statuses_to_remove.remove('failed_tasks')
task_status = self.redis_client.get_attribute(task_key, 'status')
if task_status == 'running':
if task_status and 'running' in task_status:
self.redis_client.add_to_list(request_key, 'running_tasks', task.id)
statuses_to_remove.remove('running_tasks')
elif task_status is None or task_status == 'queued':
elif (task_status is None or task_status == 'queued' or
task_status == 'pending'):
self.redis_client.add_to_list(request_key, 'queued_tasks', task.id)
statuses_to_remove.remove('queued_tasks')
for status_name in statuses_to_remove:
Expand Down
44 changes: 38 additions & 6 deletions turbinia/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

if config.TASK_MANAGER.lower() == 'celery':
from celery import states as celery_states
from kombu import exceptions as kombu_exceptions
from turbinia import tcelery as turbinia_celery

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -514,8 +515,13 @@ def process_result(self, task_result: workers.TurbiniaTaskResult):
else:
log.info(
f'Task {task_result.task_id} {task_result.task_name} '
f'from {task_result.worker_name} executed with status [{task_result.status}]'
)
f'from {task_result.worker_name} executed with status '
f'[{task_result.status}]')

task_key = self.state_manager.redis_client.build_key_name(
'task', task_result.id)
self.state_manager.redis_client.set_attribute(
task_key, 'successful', 'false')

if not isinstance(task_result.evidence, list):
log.warning(
Expand Down Expand Up @@ -613,6 +619,21 @@ def run(self, under_test=False):

time.sleep(config.SLEEP_TIME)

def close_failed_task(self, task):
result = workers.TurbiniaTaskResult(
request_id=task.request_id, no_output_manager=True,
no_state_manager=True)
result.setup(task)
if task.stub.traceback:
result.status = (
f'Task {task.id} failed with exception: {task.stub.traceback}')
else:
result.status = f'Task {task.id} failed.'
result.successful = False
result.closed = True
task.result = result
return task

def timeout_task(self, task, timeout):
"""Sets status and result data for timed out Task.
Expand Down Expand Up @@ -680,15 +701,19 @@ def process_tasks(self):
check_timeout = True
elif celery_task.status == celery_states.STARTED:
log.debug(f'Task {celery_task.id:s} not finished.')
# set status here too
check_timeout = True
elif celery_task.status == celery_states.FAILURE:
log.warning(f'Task {celery_task.id:s} failed.')
self.close_failed_task(task)
completed_tasks.append(task)
elif celery_task.status == celery_states.SUCCESS:
task.result = workers.TurbiniaTaskResult.deserialize(celery_task.result)
completed_tasks.append(task)
elif celery_task.status == celery_states.PENDING:
task.status = 'pending'
# set status here too
check_timeout = True
log.debug(f'Task {celery_task.id:s} status pending.')
elif celery_task.status == celery_states.REVOKED:
message = (
Expand Down Expand Up @@ -777,7 +802,14 @@ def enqueue_task(self, task, evidence_, timeout):
self.celery_runner.task_time_limit = celery_hard_timeout
# Time limits described here:
# https://docs.celeryq.dev/en/stable/userguide/workers.html#time-limits
task.stub = self.celery_runner.apply_async(
(task.serialize(), evidence_.serialize()), retry=False,
soft_time_limit=celery_soft_timeout, time_limit=celery_hard_timeout,
expires=config.CELERY_TASK_EXPIRATION_TIME)
try:
task.stub = self.celery_runner.apply_async(
(task.serialize(), evidence_.serialize()), retry=True,
soft_time_limit=celery_soft_timeout, time_limit=celery_hard_timeout,
expires=config.CELERY_TASK_EXPIRATION_TIME)
# Save the celery task identifier for traceability between
# Turbinia tasks and Celery tasks.
task.celery_id = task.stub.id
self.state_manager.update_task(task)
except kombu_exceptions.OperationalError as exception:
log.error(f'Error queueing task: {exception}')
14 changes: 9 additions & 5 deletions turbinia/task_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def task_deserialize(input_dict):
def task_runner(obj, *args, **kwargs):
"""Wrapper function to run specified TurbiniaTask object.
This is the method Celery tasks will execute. Any Python exceptions
raised from this method will cause the Celery task to fail.
Args:
obj: An instantiated TurbiniaTask object.
*args: Any Args to pass to obj.
Expand All @@ -198,14 +201,15 @@ def task_runner(obj, *args, **kwargs):
Output from TurbiniaTask (should be TurbiniaTaskResult).
"""
obj = task_deserialize(obj)
# Celery is configured to receive only one Task per worker
# so no need to create a FileLock.
lock = None
try:
lock = filelock.FileLock(config.LOCK_FILE)
with lock.acquire(timeout=10):
run = obj.run_wrapper(*args, **kwargs)
return run
except filelock.Timeout:
raise TurbiniaException(f'Could not acquire lock on {config.LOCK_FILE}')
log.error(f'Could not acquire lock on {config.LOCK_FILE}')
finally:
lock.release()
return run
if lock:
lock.release()
return
4 changes: 3 additions & 1 deletion turbinia/tcelery.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def setup(self):
worker_cancel_long_running_tasks_on_connection_loss=True,
worker_concurrency=1,
worker_prefetch_multiplier=1,
)
task_acks_late=True, # ack task after execution
task_reject_on_worker_lost=True, # Re-queue task if celery worker abruptly exists
worker_deduplicate_successful_tasks=True) # avoid task duplication


class TurbiniaKombu(TurbiniaMessageBase):
Expand Down
4 changes: 1 addition & 3 deletions turbinia/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from turbinia import job_utils
from turbinia.lib import docker_manager
from turbinia.jobs import manager as job_manager
from turbinia.tcelery import TurbiniaCelery

config.LoadConfig()
task_manager_type = config.TASK_MANAGER.lower()
Expand Down Expand Up @@ -249,7 +248,6 @@ def start(self):
# no apparent benefit from having this enabled at the moment.
self.worker.task(task_utils.task_runner, name='task_runner')
argv = [
'worker', '--loglevel=info', '--concurrency=1', '--without-gossip',
'--without-mingle'
'worker', '--loglevel=info', '--concurrency=1', '--without-gossip', '-E'
]
self.worker.start(argv)
17 changes: 11 additions & 6 deletions turbinia/workers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ class TurbiniaTask:
STORED_ATTRIBUTES = [
'id', 'job_id', 'job_name', 'start_time', 'last_update', 'name',
'evidence_name', 'evidence_id', 'request_id', 'requester', 'group_name',
'reason', 'group_id'
'reason', 'group_id', 'celery_id'
]

# The list of evidence states that are required by a Task in order to run.
Expand Down Expand Up @@ -482,6 +482,7 @@ def __init__(
self.base_output_dir = config.OUTPUT_DIR

self.id = uuid.uuid4().hex
self.celery_id = None
self.is_finalize_task = False
self.job_id = None
self.job_name = None
Expand All @@ -507,7 +508,6 @@ def __init__(
self.group_name = group_name
self.reason = reason
self.group_id = group_id
self.worker_name = platform.node()

def serialize(self):
"""Converts the TurbiniaTask object into a serializable dict.
Expand Down Expand Up @@ -1097,7 +1097,13 @@ def run_wrapper(self, evidence):
self._evidence_config = evidence.config
self.task_config = self.get_task_recipe(evidence.config)
self.worker_start_time = datetime.now()
updated_status = f'{self.id} is running on worker {self.worker_name}'
# Update task status so we know which worker the task executed on.
worker_name = platform.node()
updated_status = f'Task is running on {worker_name}'
task_key = self.state_manager.redis_client.build_key_name(
'task', self.id)
self.state_manager.redis_client.set_attribute(
task_key, 'worker_name', json.dumps(worker_name))
self.update_task_status(self, updated_status)
self.result = self.run(evidence, self.result)

Expand Down Expand Up @@ -1194,9 +1200,8 @@ def update_task_status(self, task, status=None):
status (str): Brief word or phrase for Task state. If not supplied, the
existing Task status will be used.
"""
if status:
task.status = 'Task {0!s} is {1!s} on {2!s}'.format(
self.name, status, self.worker_name)
if not status:
return
if not self.state_manager:
self.state_manager = state_manager.get_state_manager()
if self.state_manager:
Expand Down
6 changes: 5 additions & 1 deletion web/src/components/RequestDetails.vue
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ limitations under the License.
</v-alert>
<v-alert v-else-if="requestDetails.status === 'running'" type="info" prominent>
Request <strong>{{ requestDetails.request_id }}</strong> has <strong>{{ requestDetails.task_count -
requestDetails.successful_tasks - requestDetails.failed_tasks }}</strong> Tasks remaining.
requestDetails.successful_tasks - requestDetails.failed_tasks }}</strong> Tasks remaining.
</v-alert>
<v-alert v-else-if="requestDetails.status === 'completed_with_errors'" type="warning" prominent>
Request <strong>{{ requestDetails.request_id }}</strong> completed with <strong>{{ requestDetails.failed_tasks
}}</strong> failed Tasks.
</v-alert>
<v-alert v-else-if="requestDetails.status === 'pending'" type="info" prominent>
Request <strong>{{ requestDetails.request_id }}</strong> has <strong>{{ requestDetails.queued_tasks
}}</strong> Tasks pending.
</v-alert>
<v-alert v-else type="error" prominent>
Request <strong>{{ requestDetails.request_id }}</strong> was not successful.
</v-alert>
Expand Down
18 changes: 14 additions & 4 deletions web/src/components/TaskDetails.vue
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ limitations under the License.
{{ taskDetails.status }}
</v-alert>
<v-alert v-else-if="taskDetails.successful === null" type="info" prominent>
{{ taskDetails.status }}
<div v-if="taskDetails.status">
{{ taskDetails.status }}
</div>
<div v-else>Task {{ taskDetails.id }} is pending</div>
</v-alert>
<v-alert v-else type="error" prominent>
{{ taskDetails.status }}
Expand All @@ -63,6 +66,12 @@ limitations under the License.
</div>
<div v-else>N/A</div>
</v-list-item>
<v-list-item title="Celery ID:">
<div v-if="taskDetails.celery_id">
{{ taskDetails.celery_id }}
</div>
<div v-else>N/A</div>
</v-list-item>
<v-list-item title="Evidence ID:">
<div v-if="taskDetails.evidence_id">
{{ taskDetails.evidence_id }}
Expand Down Expand Up @@ -107,10 +116,10 @@ limitations under the License.
</template>
</v-tooltip>
</template>
<v-snackbar v-model="evidenceSnackbar" color="primary" location="top" height="55" timeout="2000">
<v-snackbar v-model="evidenceSnackbar" color="primary" location="top" height="55" timeout="2000">
Evidence output is downloading...
</v-snackbar>
<v-snackbar v-model="notCopyable" color="red" location="top" height="55" timeout="2000">
<v-snackbar v-model="notCopyable" color="red" location="top" height="55" timeout="2000">
Evidence type is not supported for downloading.
</v-snackbar>
<div v-if="taskDetails.evidence_name">
Expand Down Expand Up @@ -140,7 +149,8 @@ limitations under the License.
<template v-if="taskDetails.worker_name" v-slot:append>
<v-tooltip location="top" text="Download Worker Logs (defaults to most recent 500 entries)">
<template v-slot:activator="{ props: tooltip }">
<v-btn icon="mdi-database-outline" v-bind="tooltip" @click="downloadWorkerLogs(taskDetails.worker_name)">
<v-btn icon="mdi-database-outline" v-bind="tooltip"
@click="downloadWorkerLogs(taskDetails.worker_name)">
</v-btn>
</template>
</v-tooltip>
Expand Down
2 changes: 1 addition & 1 deletion web/src/components/TaskList.vue
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ limitations under the License.
</v-list-item-action>
</div>
<v-list-item :max-width="800">
{{ item.task_name }}: {{ $filters.truncate(item.task_status, 384, '...') }}
{{ item.task_name }} {{ $filters.truncate(item.task_status, 384, '...') }}
</v-list-item>
</v-list-item>
<v-divider> </v-divider>
Expand Down

0 comments on commit 4c4cbc0

Please sign in to comment.