diff --git a/docker/local/local-config.sed b/docker/local/local-config.sed
index 380fbd791..4f79b1908 100644
--- a/docker/local/local-config.sed
+++ b/docker/local/local-config.sed
@@ -1,5 +1,5 @@
s/CELERY_BROKER = .*/CELERY_BROKER = 'redis:\/\/redis'/g
-s/CELERY_BACKEND = .*/CELERY_BACKEND = 'redis:\/\/redis'/g
+s/CELERY_BACKEND = .*/CELERY_BACKEND = 'redis:\/\/redis\/1'/g
s/REDIS_HOST = .*/REDIS_HOST = 'redis'/g
s/PROMETHEUS_ENABLED = .*/PROMETHEUS_ENABLED = False/g
s/STATE_MANAGER = .*/STATE_MANAGER = 'Redis'/g
diff --git a/turbinia/api/cli/turbinia_client/core/groups.py b/turbinia/api/cli/turbinia_client/core/groups.py
index 2640297a7..25d377c30 100644
--- a/turbinia/api/cli/turbinia_client/core/groups.py
+++ b/turbinia/api/cli/turbinia_client/core/groups.py
@@ -87,4 +87,3 @@ def submit_group(ctx: click.Context):
of available evidence types.
"""
ctx.invoke(setup_submit)
- click.echo(submit_group.get_help(ctx))
diff --git a/turbinia/config/turbinia_config_tmpl.py b/turbinia/config/turbinia_config_tmpl.py
index 9272124b3..d6cd5a77d 100644
--- a/turbinia/config/turbinia_config_tmpl.py
+++ b/turbinia/config/turbinia_config_tmpl.py
@@ -317,7 +317,7 @@
CELERY_BROKER = f'redis://{REDIS_HOST}'
# Storage for task results/status
-CELERY_BACKEND = f'redis://{REDIS_HOST}'
+CELERY_BACKEND = f'redis://{REDIS_HOST}/1'
# Task expiration (in seconds). Tasks will be revoked
# after the expiration time elapses. Revoked tasks will not
diff --git a/turbinia/evidence.py b/turbinia/evidence.py
index 5a8b8a4bc..985609812 100644
--- a/turbinia/evidence.py
+++ b/turbinia/evidence.py
@@ -533,7 +533,7 @@ def preprocess(self, task_id, tmp_dir=None, required_states=None):
if self.resource_tracked:
# Track resource and task id in state file
log.debug(
- 'Evidence {0:s} is resource tracked. Acquiring filelock for'
+ 'Evidence {0:s} is resource tracked. Acquiring filelock for '
'preprocessing'.format(self.name))
with filelock.FileLock(config.RESOURCE_FILE_LOCK):
resource_manager.PreprocessResourceState(self.resource_id, task_id)
@@ -565,7 +565,7 @@ def postprocess(self, task_id):
if self.resource_tracked:
log.debug(
- 'Evidence: {0:s} is resource tracked. Acquiring filelock for '
+ 'Evidence {0:s} is resource tracked. Acquiring filelock for '
'postprocessing.'.format(self.name))
with filelock.FileLock(config.RESOURCE_FILE_LOCK):
# Run postprocess to either remove task_id or resource_id.
diff --git a/turbinia/state_manager.py b/turbinia/state_manager.py
index 48e24220d..416ea4a94 100644
--- a/turbinia/state_manager.py
+++ b/turbinia/state_manager.py
@@ -292,10 +292,11 @@ def update_request_task(self, task) -> None:
self.redis_client.add_to_list(request_key, 'failed_tasks', task.id)
statuses_to_remove.remove('failed_tasks')
task_status = self.redis_client.get_attribute(task_key, 'status')
- if task_status == 'running':
+ if task_status and 'Task is running' in task_status:
self.redis_client.add_to_list(request_key, 'running_tasks', task.id)
statuses_to_remove.remove('running_tasks')
- elif task_status is None or task_status == 'queued':
+ elif (task_status is None or task_status == 'queued' or
+ task_status == 'pending'):
self.redis_client.add_to_list(request_key, 'queued_tasks', task.id)
statuses_to_remove.remove('queued_tasks')
for status_name in statuses_to_remove:
@@ -615,7 +616,8 @@ def get_request_status(self, request_id):
request_status = 'successful'
elif len(request_data['task_ids']) == len(request_data['failed_tasks']):
request_status = 'failed'
- elif len(request_data['running_tasks']) > 0:
+ elif len(request_data['running_tasks']) > 0 or len(
+ request_data['queued_tasks']) > 0:
request_status = 'running'
elif len(request_data['failed_tasks']) > 0 and all_tasks_finished:
request_status = 'completed_with_errors'
diff --git a/turbinia/task_manager.py b/turbinia/task_manager.py
index 1f94274a1..2c46f1567 100644
--- a/turbinia/task_manager.py
+++ b/turbinia/task_manager.py
@@ -37,6 +37,7 @@
if config.TASK_MANAGER.lower() == 'celery':
from celery import states as celery_states
+ from kombu import exceptions as kombu_exceptions
from turbinia import tcelery as turbinia_celery
log = logging.getLogger(__name__)
@@ -514,8 +515,8 @@ def process_result(self, task_result: workers.TurbiniaTaskResult):
else:
log.info(
f'Task {task_result.task_id} {task_result.task_name} '
- f'from {task_result.worker_name} executed with status [{task_result.status}]'
- )
+ f'from {task_result.worker_name} executed with status '
+ f'[{task_result.status}]')
if not isinstance(task_result.evidence, list):
log.warning(
@@ -613,6 +614,29 @@ def run(self, under_test=False):
time.sleep(config.SLEEP_TIME)
+ def close_failed_task(self, task):
+ """Sets status and result data for failed Task.
+
+ Args:
+ task(TurbiniaTask): The Task that will be closed.
+
+ Returns:
+ TurbiniaTask: The updated Task.
+ """
+ result = workers.TurbiniaTaskResult(
+ request_id=task.request_id, no_output_manager=True,
+ no_state_manager=True)
+ result.setup(task)
+ if task.stub.traceback:
+ result.status = (
+ f'Task {task.id} failed with exception: {task.stub.traceback}')
+ else:
+ result.status = f'Task {task.id} failed.'
+ result.successful = False
+ result.closed = True
+ task.result = result
+ return task
+
def timeout_task(self, task, timeout):
"""Sets status and result data for timed out Task.
@@ -679,17 +703,24 @@ def process_tasks(self):
log.debug(f'Task {task.stub.task_id:s} not yet created.')
check_timeout = True
elif celery_task.status == celery_states.STARTED:
+ # Task status will be set to running when the worker executes run_wrapper()
log.debug(f'Task {celery_task.id:s} not finished.')
check_timeout = True
elif celery_task.status == celery_states.FAILURE:
log.warning(f'Task {celery_task.id:s} failed.')
+ self.close_failed_task(task)
completed_tasks.append(task)
elif celery_task.status == celery_states.SUCCESS:
task.result = workers.TurbiniaTaskResult.deserialize(celery_task.result)
completed_tasks.append(task)
elif celery_task.status == celery_states.PENDING:
task.status = 'pending'
- log.debug(f'Task {celery_task.id:s} status pending.')
+ check_timeout = True
+ log.debug(f'Task {celery_task.id:s} is pending.')
+ elif celery_task.status == celery_states.RECEIVED:
+ task.status = 'queued'
+ check_timeout = True
+ log.debug(f'Task {celery_task.id:s} is queued.')
elif celery_task.status == celery_states.REVOKED:
message = (
f'Celery task {celery_task.id:s} associated with Turbinia '
@@ -777,7 +808,14 @@ def enqueue_task(self, task, evidence_, timeout):
self.celery_runner.task_time_limit = celery_hard_timeout
# Time limits described here:
# https://docs.celeryq.dev/en/stable/userguide/workers.html#time-limits
- task.stub = self.celery_runner.apply_async(
- (task.serialize(), evidence_.serialize()), retry=False,
- soft_time_limit=celery_soft_timeout, time_limit=celery_hard_timeout,
- expires=config.CELERY_TASK_EXPIRATION_TIME)
+ try:
+ task.stub = self.celery_runner.apply_async(
+ (task.serialize(), evidence_.serialize()), retry=True,
+ soft_time_limit=celery_soft_timeout, time_limit=celery_hard_timeout,
+ expires=config.CELERY_TASK_EXPIRATION_TIME)
+ # Save the celery task identifier for traceability between
+ # Turbinia tasks and Celery tasks.
+ task.celery_id = task.stub.id
+ self.state_manager.update_task(task)
+ except kombu_exceptions.OperationalError as exception:
+ log.error(f'Error queueing task: {exception}')
diff --git a/turbinia/task_utils.py b/turbinia/task_utils.py
index 0346d985d..d3ca95660 100644
--- a/turbinia/task_utils.py
+++ b/turbinia/task_utils.py
@@ -189,6 +189,9 @@ def task_deserialize(input_dict):
def task_runner(obj, *args, **kwargs):
"""Wrapper function to run specified TurbiniaTask object.
+ This is the method Celery tasks will execute. Any Python exceptions
+ raised from this method will cause the Celery task to fail.
+
Args:
obj: An instantiated TurbiniaTask object.
*args: Any Args to pass to obj.
@@ -198,14 +201,15 @@ def task_runner(obj, *args, **kwargs):
Output from TurbiniaTask (should be TurbiniaTaskResult).
"""
obj = task_deserialize(obj)
- # Celery is configured to receive only one Task per worker
- # so no need to create a FileLock.
+ lock = None
try:
lock = filelock.FileLock(config.LOCK_FILE)
with lock.acquire(timeout=10):
run = obj.run_wrapper(*args, **kwargs)
+ return run
except filelock.Timeout:
- raise TurbiniaException(f'Could not acquire lock on {config.LOCK_FILE}')
+ log.error(f'Could not acquire lock on {config.LOCK_FILE}')
finally:
- lock.release()
- return run
+ if lock:
+ lock.release()
+ return
diff --git a/turbinia/tcelery.py b/turbinia/tcelery.py
index 5eb741e9f..9d0b3b200 100644
--- a/turbinia/tcelery.py
+++ b/turbinia/tcelery.py
@@ -50,13 +50,18 @@ def setup(self):
self.app = celery.Celery(
'turbinia', broker=config.CELERY_BROKER, backend=config.CELERY_BACKEND)
self.app.conf.update(
+ accept_content=['json'],
broker_connection_retry_on_startup=True,
+ # Store Celery task results metadata
+ result_backend=config.CELERY_BACKEND,
task_default_queue=config.INSTANCE_ID,
- accept_content=['json'],
+ # Re-queue task if Celery worker abruptly exists
+ task_reject_on_worker_lost=True,
worker_cancel_long_running_tasks_on_connection_loss=True,
worker_concurrency=1,
worker_prefetch_multiplier=1,
- )
+ # Avoid task duplication
+ worker_deduplicate_successful_tasks=True)
class TurbiniaKombu(TurbiniaMessageBase):
diff --git a/turbinia/worker.py b/turbinia/worker.py
index 9728a54c3..338a2a78b 100644
--- a/turbinia/worker.py
+++ b/turbinia/worker.py
@@ -27,7 +27,6 @@
from turbinia import job_utils
from turbinia.lib import docker_manager
from turbinia.jobs import manager as job_manager
-from turbinia.tcelery import TurbiniaCelery
config.LoadConfig()
task_manager_type = config.TASK_MANAGER.lower()
@@ -249,7 +248,6 @@ def start(self):
# no apparent benefit from having this enabled at the moment.
self.worker.task(task_utils.task_runner, name='task_runner')
argv = [
- 'worker', '--loglevel=info', '--concurrency=1', '--without-gossip',
- '--without-mingle'
+ 'worker', '--loglevel=info', '--concurrency=1', '--without-gossip', '-E'
]
self.worker.start(argv)
diff --git a/turbinia/workers/__init__.py b/turbinia/workers/__init__.py
index 9affbbc3b..3b11d6ab9 100644
--- a/turbinia/workers/__init__.py
+++ b/turbinia/workers/__init__.py
@@ -454,7 +454,7 @@ class TurbiniaTask:
STORED_ATTRIBUTES = [
'id', 'job_id', 'job_name', 'start_time', 'last_update', 'name',
'evidence_name', 'evidence_id', 'request_id', 'requester', 'group_name',
- 'reason', 'group_id'
+ 'reason', 'group_id', 'celery_id'
]
# The list of evidence states that are required by a Task in order to run.
@@ -482,6 +482,7 @@ def __init__(
self.base_output_dir = config.OUTPUT_DIR
self.id = uuid.uuid4().hex
+ self.celery_id = None
self.is_finalize_task = False
self.job_id = None
self.job_name = None
@@ -507,7 +508,6 @@ def __init__(
self.group_name = group_name
self.reason = reason
self.group_id = group_id
- self.worker_name = platform.node()
def serialize(self):
"""Converts the TurbiniaTask object into a serializable dict.
@@ -1039,10 +1039,21 @@ def run_wrapper(self, evidence):
log.debug(f'Task {self.name:s} {self.id:s} awaiting execution')
failure_message = None
+ worker_name = platform.node()
try:
evidence = evidence_decode(evidence)
self.result = self.setup(evidence)
+ # Call update_task_status to update status
+ # We cannot call update_task() here since it will clobber previously
+ # stored data by the Turbinia server when the task was created, which is
+ # not present in the TurbiniaTask object the worker currently has in its
+ # runtime.
self.update_task_status(self, 'queued')
+ # Beucase of the same reason, we perform a single attribute update
+ # for the worker name.
+ task_key = self.state_manager.redis_client.build_key_name('task', self.id)
+ self.state_manager.redis_client.set_attribute(
+ task_key, 'worker_name', json.dumps(worker_name))
turbinia_worker_tasks_queued_total.inc()
task_runtime_metrics = self.get_metrics()
except TurbiniaException as exception:
@@ -1097,7 +1108,8 @@ def run_wrapper(self, evidence):
self._evidence_config = evidence.config
self.task_config = self.get_task_recipe(evidence.config)
self.worker_start_time = datetime.now()
- updated_status = f'{self.id} is running on worker {self.worker_name}'
+ # Update task status so we know which worker the task executed on.
+ updated_status = f'Task is running on {worker_name}'
self.update_task_status(self, updated_status)
self.result = self.run(evidence, self.result)
@@ -1194,9 +1206,8 @@ def update_task_status(self, task, status=None):
status (str): Brief word or phrase for Task state. If not supplied, the
existing Task status will be used.
"""
- if status:
- task.status = 'Task {0!s} is {1!s} on {2!s}'.format(
- self.name, status, self.worker_name)
+ if not status:
+ return
if not self.state_manager:
self.state_manager = state_manager.get_state_manager()
if self.state_manager:
diff --git a/turbinia/workers/dfdewey.py b/turbinia/workers/dfdewey.py
index f25575222..7191d6175 100644
--- a/turbinia/workers/dfdewey.py
+++ b/turbinia/workers/dfdewey.py
@@ -91,7 +91,7 @@ def run(self, evidence, result):
result.log(status_summary)
else:
status_summary = (
- 'Not running dfDewey. Case was not provided in task config.')
+ 'dfDewey will not execute. Case was not provided in task config.')
result.log(status_summary)
result.close(self, success=success, status=status_summary)
diff --git a/turbinia/workers/fsstat.py b/turbinia/workers/fsstat.py
index 85b8abb8f..21a1f90a8 100644
--- a/turbinia/workers/fsstat.py
+++ b/turbinia/workers/fsstat.py
@@ -45,7 +45,7 @@ def run(self, evidence, result):
# Since fsstat does not support some filesystems, we won't run it when we
# know the partition is not supported.
elif evidence.path_spec.type_indicator in ("APFS", "XFS"):
- message = 'Not running fsstat since partition is not supported'
+ message = 'Not processing since partition is not supported'
result.log(message)
result.close(self, success=True, status=message)
else:
diff --git a/web/src/components/RequestDetails.vue b/web/src/components/RequestDetails.vue
index b5daf6a42..512b26be1 100644
--- a/web/src/components/RequestDetails.vue
+++ b/web/src/components/RequestDetails.vue
@@ -41,12 +41,16 @@ limitations under the License.