Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various updates to task management #1546

Merged
merged 10 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker/local/local-config.sed
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
s/CELERY_BROKER = .*/CELERY_BROKER = 'redis:\/\/redis'/g
s/CELERY_BACKEND = .*/CELERY_BACKEND = 'redis:\/\/redis'/g
s/CELERY_BACKEND = .*/CELERY_BACKEND = 'redis:\/\/redis\/1'/g
s/REDIS_HOST = .*/REDIS_HOST = 'redis'/g
s/PROMETHEUS_ENABLED = .*/PROMETHEUS_ENABLED = False/g
s/STATE_MANAGER = .*/STATE_MANAGER = 'Redis'/g
Expand Down
1 change: 0 additions & 1 deletion turbinia/api/cli/turbinia_client/core/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,3 @@ def submit_group(ctx: click.Context):
of available evidence types.
"""
ctx.invoke(setup_submit)
click.echo(submit_group.get_help(ctx))
2 changes: 1 addition & 1 deletion turbinia/config/turbinia_config_tmpl.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@
CELERY_BROKER = f'redis://{REDIS_HOST}'

# Storage for task results/status
CELERY_BACKEND = f'redis://{REDIS_HOST}'
CELERY_BACKEND = f'redis://{REDIS_HOST}/1'

# Task expiration (in seconds). Tasks will be revoked
# after the expiration time elapses. Revoked tasks will not
Expand Down
4 changes: 2 additions & 2 deletions turbinia/evidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ def preprocess(self, task_id, tmp_dir=None, required_states=None):
if self.resource_tracked:
# Track resource and task id in state file
log.debug(
'Evidence {0:s} is resource tracked. Acquiring filelock for'
'Evidence {0:s} is resource tracked. Acquiring filelock for '
'preprocessing'.format(self.name))
with filelock.FileLock(config.RESOURCE_FILE_LOCK):
resource_manager.PreprocessResourceState(self.resource_id, task_id)
Expand Down Expand Up @@ -565,7 +565,7 @@ def postprocess(self, task_id):

if self.resource_tracked:
log.debug(
'Evidence: {0:s} is resource tracked. Acquiring filelock for '
'Evidence {0:s} is resource tracked. Acquiring filelock for '
'postprocessing.'.format(self.name))
with filelock.FileLock(config.RESOURCE_FILE_LOCK):
# Run postprocess to either remove task_id or resource_id.
Expand Down
8 changes: 5 additions & 3 deletions turbinia/state_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,11 @@ def update_request_task(self, task) -> None:
self.redis_client.add_to_list(request_key, 'failed_tasks', task.id)
statuses_to_remove.remove('failed_tasks')
task_status = self.redis_client.get_attribute(task_key, 'status')
if task_status == 'running':
if task_status and 'Task is running' in task_status:
self.redis_client.add_to_list(request_key, 'running_tasks', task.id)
statuses_to_remove.remove('running_tasks')
elif task_status is None or task_status == 'queued':
elif (task_status is None or task_status == 'queued' or
task_status == 'pending'):
self.redis_client.add_to_list(request_key, 'queued_tasks', task.id)
statuses_to_remove.remove('queued_tasks')
for status_name in statuses_to_remove:
Expand Down Expand Up @@ -615,7 +616,8 @@ def get_request_status(self, request_id):
request_status = 'successful'
elif len(request_data['task_ids']) == len(request_data['failed_tasks']):
request_status = 'failed'
elif len(request_data['running_tasks']) > 0:
elif len(request_data['running_tasks']) > 0 or len(
request_data['queued_tasks']) > 0:
request_status = 'running'
elif len(request_data['failed_tasks']) > 0 and all_tasks_finished:
request_status = 'completed_with_errors'
Expand Down
57 changes: 50 additions & 7 deletions turbinia/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

if config.TASK_MANAGER.lower() == 'celery':
from celery import states as celery_states
from kombu import exceptions as kombu_exceptions
from turbinia import tcelery as turbinia_celery

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -514,8 +515,13 @@ def process_result(self, task_result: workers.TurbiniaTaskResult):
else:
log.info(
f'Task {task_result.task_id} {task_result.task_name} '
f'from {task_result.worker_name} executed with status [{task_result.status}]'
)
f'from {task_result.worker_name} executed with status '
f'[{task_result.status}]')

task_key = self.state_manager.redis_client.build_key_name(
'task', task_result.id)
self.state_manager.redis_client.set_attribute(
task_key, 'successful', 'false')
jleaniz marked this conversation as resolved.
Show resolved Hide resolved

if not isinstance(task_result.evidence, list):
log.warning(
Expand Down Expand Up @@ -613,6 +619,29 @@ def run(self, under_test=False):

time.sleep(config.SLEEP_TIME)

def close_failed_task(self, task):
"""Sets status and result data for failed Task.

Args:
task(TurbiniaTask): The Task that will be closed.

Returns:
TurbiniaTask: The updated Task.
"""
result = workers.TurbiniaTaskResult(
jleaniz marked this conversation as resolved.
Show resolved Hide resolved
request_id=task.request_id, no_output_manager=True,
no_state_manager=True)
result.setup(task)
if task.stub.traceback:
result.status = (
f'Task {task.id} failed with exception: {task.stub.traceback}')
else:
result.status = f'Task {task.id} failed.'
result.successful = False
result.closed = True
task.result = result
return task

def timeout_task(self, task, timeout):
"""Sets status and result data for timed out Task.

Expand Down Expand Up @@ -679,17 +708,24 @@ def process_tasks(self):
log.debug(f'Task {task.stub.task_id:s} not yet created.')
check_timeout = True
elif celery_task.status == celery_states.STARTED:
# Task status will be set to running when the worker executes run_wrapper()
log.debug(f'Task {celery_task.id:s} not finished.')
check_timeout = True
elif celery_task.status == celery_states.FAILURE:
log.warning(f'Task {celery_task.id:s} failed.')
self.close_failed_task(task)
completed_tasks.append(task)
elif celery_task.status == celery_states.SUCCESS:
task.result = workers.TurbiniaTaskResult.deserialize(celery_task.result)
completed_tasks.append(task)
elif celery_task.status == celery_states.PENDING:
task.status = 'pending'
log.debug(f'Task {celery_task.id:s} status pending.')
check_timeout = True
log.debug(f'Task {celery_task.id:s} is pending.')
elif celery_task.status == celery_states.RECEIVED:
task.status = 'queued'
check_timeout = True
log.debug(f'Task {celery_task.id:s} is queued.')
elif celery_task.status == celery_states.REVOKED:
message = (
f'Celery task {celery_task.id:s} associated with Turbinia '
Expand Down Expand Up @@ -777,7 +813,14 @@ def enqueue_task(self, task, evidence_, timeout):
self.celery_runner.task_time_limit = celery_hard_timeout
# Time limits described here:
# https://docs.celeryq.dev/en/stable/userguide/workers.html#time-limits
task.stub = self.celery_runner.apply_async(
(task.serialize(), evidence_.serialize()), retry=False,
soft_time_limit=celery_soft_timeout, time_limit=celery_hard_timeout,
expires=config.CELERY_TASK_EXPIRATION_TIME)
try:
task.stub = self.celery_runner.apply_async(
(task.serialize(), evidence_.serialize()), retry=True,
soft_time_limit=celery_soft_timeout, time_limit=celery_hard_timeout,
expires=config.CELERY_TASK_EXPIRATION_TIME)
# Save the celery task identifier for traceability between
# Turbinia tasks and Celery tasks.
task.celery_id = task.stub.id
self.state_manager.update_task(task)
except kombu_exceptions.OperationalError as exception:
log.error(f'Error queueing task: {exception}')
14 changes: 9 additions & 5 deletions turbinia/task_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ def task_deserialize(input_dict):
def task_runner(obj, *args, **kwargs):
"""Wrapper function to run specified TurbiniaTask object.

This is the method Celery tasks will execute. Any Python exceptions
raised from this method will cause the Celery task to fail.

Args:
obj: An instantiated TurbiniaTask object.
*args: Any Args to pass to obj.
Expand All @@ -198,14 +201,15 @@ def task_runner(obj, *args, **kwargs):
Output from TurbiniaTask (should be TurbiniaTaskResult).
"""
obj = task_deserialize(obj)
# Celery is configured to receive only one Task per worker
# so no need to create a FileLock.
lock = None
try:
lock = filelock.FileLock(config.LOCK_FILE)
with lock.acquire(timeout=10):
run = obj.run_wrapper(*args, **kwargs)
return run
except filelock.Timeout:
raise TurbiniaException(f'Could not acquire lock on {config.LOCK_FILE}')
log.error(f'Could not acquire lock on {config.LOCK_FILE}')
finally:
lock.release()
return run
if lock:
lock.release()
return
9 changes: 7 additions & 2 deletions turbinia/tcelery.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ def setup(self):
self.app = celery.Celery(
'turbinia', broker=config.CELERY_BROKER, backend=config.CELERY_BACKEND)
self.app.conf.update(
accept_content=['json'],
broker_connection_retry_on_startup=True,
# Store Celery task results metadata
result_backend=config.CELERY_BACKEND,
task_default_queue=config.INSTANCE_ID,
accept_content=['json'],
# Re-queue task if Celery worker abruptly exists
task_reject_on_worker_lost=True,
worker_cancel_long_running_tasks_on_connection_loss=True,
worker_concurrency=1,
worker_prefetch_multiplier=1,
)
# Avoid task duplication
worker_deduplicate_successful_tasks=True)


class TurbiniaKombu(TurbiniaMessageBase):
Expand Down
4 changes: 1 addition & 3 deletions turbinia/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from turbinia import job_utils
from turbinia.lib import docker_manager
from turbinia.jobs import manager as job_manager
from turbinia.tcelery import TurbiniaCelery

config.LoadConfig()
task_manager_type = config.TASK_MANAGER.lower()
Expand Down Expand Up @@ -249,7 +248,6 @@ def start(self):
# no apparent benefit from having this enabled at the moment.
self.worker.task(task_utils.task_runner, name='task_runner')
argv = [
'worker', '--loglevel=info', '--concurrency=1', '--without-gossip',
'--without-mingle'
'worker', '--loglevel=info', '--concurrency=1', '--without-gossip', '-E'
]
self.worker.start(argv)
16 changes: 10 additions & 6 deletions turbinia/workers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ class TurbiniaTask:
STORED_ATTRIBUTES = [
'id', 'job_id', 'job_name', 'start_time', 'last_update', 'name',
'evidence_name', 'evidence_id', 'request_id', 'requester', 'group_name',
'reason', 'group_id'
'reason', 'group_id', 'celery_id'
]

# The list of evidence states that are required by a Task in order to run.
Expand Down Expand Up @@ -482,6 +482,7 @@ def __init__(
self.base_output_dir = config.OUTPUT_DIR

self.id = uuid.uuid4().hex
self.celery_id = None
self.is_finalize_task = False
self.job_id = None
self.job_name = None
Expand All @@ -507,7 +508,6 @@ def __init__(
self.group_name = group_name
self.reason = reason
self.group_id = group_id
self.worker_name = platform.node()

def serialize(self):
"""Converts the TurbiniaTask object into a serializable dict.
Expand Down Expand Up @@ -1039,10 +1039,14 @@ def run_wrapper(self, evidence):

log.debug(f'Task {self.name:s} {self.id:s} awaiting execution')
failure_message = None
worker_name = platform.node()
try:
evidence = evidence_decode(evidence)
self.result = self.setup(evidence)
self.update_task_status(self, 'queued')
task_key = self.state_manager.redis_client.build_key_name('task', self.id)
self.state_manager.redis_client.set_attribute(
task_key, 'worker_name', json.dumps(worker_name))
jleaniz marked this conversation as resolved.
Show resolved Hide resolved
turbinia_worker_tasks_queued_total.inc()
task_runtime_metrics = self.get_metrics()
except TurbiniaException as exception:
Expand Down Expand Up @@ -1097,7 +1101,8 @@ def run_wrapper(self, evidence):
self._evidence_config = evidence.config
self.task_config = self.get_task_recipe(evidence.config)
self.worker_start_time = datetime.now()
updated_status = f'{self.id} is running on worker {self.worker_name}'
# Update task status so we know which worker the task executed on.
updated_status = f'Task is running on {worker_name}'
self.update_task_status(self, updated_status)
self.result = self.run(evidence, self.result)

Expand Down Expand Up @@ -1194,9 +1199,8 @@ def update_task_status(self, task, status=None):
status (str): Brief word or phrase for Task state. If not supplied, the
existing Task status will be used.
"""
if status:
task.status = 'Task {0!s} is {1!s} on {2!s}'.format(
self.name, status, self.worker_name)
if not status:
return
if not self.state_manager:
self.state_manager = state_manager.get_state_manager()
if self.state_manager:
Expand Down
2 changes: 1 addition & 1 deletion turbinia/workers/dfdewey.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def run(self, evidence, result):
result.log(status_summary)
else:
status_summary = (
'Not running dfDewey. Case was not provided in task config.')
'dfDewey will not execute. Case was not provided in task config.')
result.log(status_summary)

result.close(self, success=success, status=status_summary)
Expand Down
2 changes: 1 addition & 1 deletion turbinia/workers/fsstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def run(self, evidence, result):
# Since fsstat does not support some filesystems, we won't run it when we
# know the partition is not supported.
elif evidence.path_spec.type_indicator in ("APFS", "XFS"):
message = 'Not running fsstat since partition is not supported'
message = 'Not processing since partition is not supported'
result.log(message)
result.close(self, success=True, status=message)
else:
Expand Down
6 changes: 5 additions & 1 deletion web/src/components/RequestDetails.vue
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ limitations under the License.
</v-alert>
<v-alert v-else-if="requestDetails.status === 'running'" type="info" prominent>
Request <strong>{{ requestDetails.request_id }}</strong> has <strong>{{ requestDetails.task_count -
requestDetails.successful_tasks - requestDetails.failed_tasks }}</strong> Tasks remaining.
requestDetails.successful_tasks - requestDetails.failed_tasks }}</strong> Tasks remaining.
</v-alert>
<v-alert v-else-if="requestDetails.status === 'completed_with_errors'" type="warning" prominent>
Request <strong>{{ requestDetails.request_id }}</strong> completed with <strong>{{ requestDetails.failed_tasks
}}</strong> failed Tasks.
</v-alert>
<v-alert v-else-if="requestDetails.status === 'pending'" type="info" prominent>
Request <strong>{{ requestDetails.request_id }}</strong> has <strong>{{ requestDetails.queued_tasks
}}</strong> Tasks pending.
</v-alert>
<v-alert v-else type="error" prominent>
Request <strong>{{ requestDetails.request_id }}</strong> was not successful.
</v-alert>
Expand Down
Loading