diff --git a/turbinia/api/cli/turbinia_client/core/groups.py b/turbinia/api/cli/turbinia_client/core/groups.py index 2640297a7..25d377c30 100644 --- a/turbinia/api/cli/turbinia_client/core/groups.py +++ b/turbinia/api/cli/turbinia_client/core/groups.py @@ -87,4 +87,3 @@ def submit_group(ctx: click.Context): of available evidence types. """ ctx.invoke(setup_submit) - click.echo(submit_group.get_help(ctx)) diff --git a/turbinia/evidence.py b/turbinia/evidence.py index 5a8b8a4bc..985609812 100644 --- a/turbinia/evidence.py +++ b/turbinia/evidence.py @@ -533,7 +533,7 @@ def preprocess(self, task_id, tmp_dir=None, required_states=None): if self.resource_tracked: # Track resource and task id in state file log.debug( - 'Evidence {0:s} is resource tracked. Acquiring filelock for' + 'Evidence {0:s} is resource tracked. Acquiring filelock for ' 'preprocessing'.format(self.name)) with filelock.FileLock(config.RESOURCE_FILE_LOCK): resource_manager.PreprocessResourceState(self.resource_id, task_id) @@ -565,7 +565,7 @@ def postprocess(self, task_id): if self.resource_tracked: log.debug( - 'Evidence: {0:s} is resource tracked. Acquiring filelock for ' + 'Evidence {0:s} is resource tracked. Acquiring filelock for ' 'postprocessing.'.format(self.name)) with filelock.FileLock(config.RESOURCE_FILE_LOCK): # Run postprocess to either remove task_id or resource_id. diff --git a/turbinia/state_manager.py b/turbinia/state_manager.py index 48e24220d..791f6b118 100644 --- a/turbinia/state_manager.py +++ b/turbinia/state_manager.py @@ -292,10 +292,11 @@ def update_request_task(self, task) -> None: self.redis_client.add_to_list(request_key, 'failed_tasks', task.id) statuses_to_remove.remove('failed_tasks') task_status = self.redis_client.get_attribute(task_key, 'status') - if task_status == 'running': + if task_status and 'running' in task_status: self.redis_client.add_to_list(request_key, 'running_tasks', task.id) statuses_to_remove.remove('running_tasks') - elif task_status is None or task_status == 'queued': + elif (task_status is None or task_status == 'queued' or + task_status == 'pending'): self.redis_client.add_to_list(request_key, 'queued_tasks', task.id) statuses_to_remove.remove('queued_tasks') for status_name in statuses_to_remove: diff --git a/turbinia/task_manager.py b/turbinia/task_manager.py index 219ac3cac..f2e36d4d5 100644 --- a/turbinia/task_manager.py +++ b/turbinia/task_manager.py @@ -37,6 +37,7 @@ if config.TASK_MANAGER.lower() == 'celery': from celery import states as celery_states + from kombu import exceptions as kombu_exceptions from turbinia import tcelery as turbinia_celery log = logging.getLogger(__name__) @@ -514,8 +515,13 @@ def process_result(self, task_result: workers.TurbiniaTaskResult): else: log.info( f'Task {task_result.task_id} {task_result.task_name} ' - f'from {task_result.worker_name} executed with status [{task_result.status}]' - ) + f'from {task_result.worker_name} executed with status ' + f'[{task_result.status}]') + + task_key = self.state_manager.redis_client.build_key_name( + 'task', task_result.id) + self.state_manager.redis_client.set_attribute( + task_key, 'successful', 'false') if not isinstance(task_result.evidence, list): log.warning( @@ -613,6 +619,21 @@ def run(self, under_test=False): time.sleep(config.SLEEP_TIME) + def close_failed_task(self, task): + result = workers.TurbiniaTaskResult( + request_id=task.request_id, no_output_manager=True, + no_state_manager=True) + result.setup(task) + if task.stub.traceback: + result.status = ( + f'Task {task.id} failed with exception: {task.stub.traceback}') + else: + result.status = f'Task {task.id} failed.' + result.successful = False + result.closed = True + task.result = result + return task + def timeout_task(self, task, timeout): """Sets status and result data for timed out Task. @@ -680,15 +701,19 @@ def process_tasks(self): check_timeout = True elif celery_task.status == celery_states.STARTED: log.debug(f'Task {celery_task.id:s} not finished.') + # set status here too check_timeout = True elif celery_task.status == celery_states.FAILURE: log.warning(f'Task {celery_task.id:s} failed.') + self.close_failed_task(task) completed_tasks.append(task) elif celery_task.status == celery_states.SUCCESS: task.result = workers.TurbiniaTaskResult.deserialize(celery_task.result) completed_tasks.append(task) elif celery_task.status == celery_states.PENDING: task.status = 'pending' + # set status here too + check_timeout = True log.debug(f'Task {celery_task.id:s} status pending.') elif celery_task.status == celery_states.REVOKED: message = ( @@ -777,7 +802,14 @@ def enqueue_task(self, task, evidence_, timeout): self.celery_runner.task_time_limit = celery_hard_timeout # Time limits described here: # https://docs.celeryq.dev/en/stable/userguide/workers.html#time-limits - task.stub = self.celery_runner.apply_async( - (task.serialize(), evidence_.serialize()), retry=False, - soft_time_limit=celery_soft_timeout, time_limit=celery_hard_timeout, - expires=config.CELERY_TASK_EXPIRATION_TIME) + try: + task.stub = self.celery_runner.apply_async( + (task.serialize(), evidence_.serialize()), retry=True, + soft_time_limit=celery_soft_timeout, time_limit=celery_hard_timeout, + expires=config.CELERY_TASK_EXPIRATION_TIME) + # Save the celery task identifier for traceability between + # Turbinia tasks and Celery tasks. + task.celery_id = task.stub.id + self.state_manager.update_task(task) + except kombu_exceptions.OperationalError as exception: + log.error(f'Error queueing task: {exception}') diff --git a/turbinia/task_utils.py b/turbinia/task_utils.py index 0346d985d..d3ca95660 100644 --- a/turbinia/task_utils.py +++ b/turbinia/task_utils.py @@ -189,6 +189,9 @@ def task_deserialize(input_dict): def task_runner(obj, *args, **kwargs): """Wrapper function to run specified TurbiniaTask object. + This is the method Celery tasks will execute. Any Python exceptions + raised from this method will cause the Celery task to fail. + Args: obj: An instantiated TurbiniaTask object. *args: Any Args to pass to obj. @@ -198,14 +201,15 @@ def task_runner(obj, *args, **kwargs): Output from TurbiniaTask (should be TurbiniaTaskResult). """ obj = task_deserialize(obj) - # Celery is configured to receive only one Task per worker - # so no need to create a FileLock. + lock = None try: lock = filelock.FileLock(config.LOCK_FILE) with lock.acquire(timeout=10): run = obj.run_wrapper(*args, **kwargs) + return run except filelock.Timeout: - raise TurbiniaException(f'Could not acquire lock on {config.LOCK_FILE}') + log.error(f'Could not acquire lock on {config.LOCK_FILE}') finally: - lock.release() - return run + if lock: + lock.release() + return diff --git a/turbinia/tcelery.py b/turbinia/tcelery.py index 5eb741e9f..07eb3e2a1 100644 --- a/turbinia/tcelery.py +++ b/turbinia/tcelery.py @@ -56,7 +56,9 @@ def setup(self): worker_cancel_long_running_tasks_on_connection_loss=True, worker_concurrency=1, worker_prefetch_multiplier=1, - ) + task_acks_late=True, # ack task after execution + task_reject_on_worker_lost=True, # Re-queue task if celery worker abruptly exists + worker_deduplicate_successful_tasks=True) # avoid task duplication class TurbiniaKombu(TurbiniaMessageBase): diff --git a/turbinia/worker.py b/turbinia/worker.py index 9728a54c3..338a2a78b 100644 --- a/turbinia/worker.py +++ b/turbinia/worker.py @@ -27,7 +27,6 @@ from turbinia import job_utils from turbinia.lib import docker_manager from turbinia.jobs import manager as job_manager -from turbinia.tcelery import TurbiniaCelery config.LoadConfig() task_manager_type = config.TASK_MANAGER.lower() @@ -249,7 +248,6 @@ def start(self): # no apparent benefit from having this enabled at the moment. self.worker.task(task_utils.task_runner, name='task_runner') argv = [ - 'worker', '--loglevel=info', '--concurrency=1', '--without-gossip', - '--without-mingle' + 'worker', '--loglevel=info', '--concurrency=1', '--without-gossip', '-E' ] self.worker.start(argv) diff --git a/turbinia/workers/__init__.py b/turbinia/workers/__init__.py index 9affbbc3b..f745ee9e2 100644 --- a/turbinia/workers/__init__.py +++ b/turbinia/workers/__init__.py @@ -454,7 +454,7 @@ class TurbiniaTask: STORED_ATTRIBUTES = [ 'id', 'job_id', 'job_name', 'start_time', 'last_update', 'name', 'evidence_name', 'evidence_id', 'request_id', 'requester', 'group_name', - 'reason', 'group_id' + 'reason', 'group_id', 'celery_id' ] # The list of evidence states that are required by a Task in order to run. @@ -482,6 +482,7 @@ def __init__( self.base_output_dir = config.OUTPUT_DIR self.id = uuid.uuid4().hex + self.celery_id = None self.is_finalize_task = False self.job_id = None self.job_name = None @@ -507,7 +508,6 @@ def __init__( self.group_name = group_name self.reason = reason self.group_id = group_id - self.worker_name = platform.node() def serialize(self): """Converts the TurbiniaTask object into a serializable dict. @@ -1097,7 +1097,13 @@ def run_wrapper(self, evidence): self._evidence_config = evidence.config self.task_config = self.get_task_recipe(evidence.config) self.worker_start_time = datetime.now() - updated_status = f'{self.id} is running on worker {self.worker_name}' + # Update task status so we know which worker the task executed on. + worker_name = platform.node() + updated_status = f'Task is running on {worker_name}' + task_key = self.state_manager.redis_client.build_key_name( + 'task', self.id) + self.state_manager.redis_client.set_attribute( + task_key, 'worker_name', json.dumps(worker_name)) self.update_task_status(self, updated_status) self.result = self.run(evidence, self.result) @@ -1194,9 +1200,8 @@ def update_task_status(self, task, status=None): status (str): Brief word or phrase for Task state. If not supplied, the existing Task status will be used. """ - if status: - task.status = 'Task {0!s} is {1!s} on {2!s}'.format( - self.name, status, self.worker_name) + if not status: + return if not self.state_manager: self.state_manager = state_manager.get_state_manager() if self.state_manager: diff --git a/web/src/components/RequestDetails.vue b/web/src/components/RequestDetails.vue index b5daf6a42..512b26be1 100644 --- a/web/src/components/RequestDetails.vue +++ b/web/src/components/RequestDetails.vue @@ -41,12 +41,16 @@ limitations under the License. Request {{ requestDetails.request_id }} has {{ requestDetails.task_count - - requestDetails.successful_tasks - requestDetails.failed_tasks }} Tasks remaining. + requestDetails.successful_tasks - requestDetails.failed_tasks }} Tasks remaining. Request {{ requestDetails.request_id }} completed with {{ requestDetails.failed_tasks }} failed Tasks. + + Request {{ requestDetails.request_id }} has {{ requestDetails.queued_tasks + }} Tasks pending. + Request {{ requestDetails.request_id }} was not successful. diff --git a/web/src/components/TaskDetails.vue b/web/src/components/TaskDetails.vue index 31c6095f1..ff1fd1eab 100644 --- a/web/src/components/TaskDetails.vue +++ b/web/src/components/TaskDetails.vue @@ -40,7 +40,10 @@ limitations under the License. {{ taskDetails.status }} - {{ taskDetails.status }} +
+ {{ taskDetails.status }} +
+
Task {{ taskDetails.id }} is pending
{{ taskDetails.status }} @@ -63,6 +66,12 @@ limitations under the License.
N/A
+ +
+ {{ taskDetails.celery_id }} +
+
N/A
+
{{ taskDetails.evidence_id }} @@ -107,10 +116,10 @@ limitations under the License. - + Evidence output is downloading... - + Evidence type is not supported for downloading.
@@ -140,7 +149,8 @@ limitations under the License.