diff --git a/turbinia/state_manager.py b/turbinia/state_manager.py index afee47505..416ea4a94 100644 --- a/turbinia/state_manager.py +++ b/turbinia/state_manager.py @@ -616,7 +616,8 @@ def get_request_status(self, request_id): request_status = 'successful' elif len(request_data['task_ids']) == len(request_data['failed_tasks']): request_status = 'failed' - elif len(request_data['running_tasks']) > 0: + elif len(request_data['running_tasks']) > 0 or len( + request_data['queued_tasks']) > 0: request_status = 'running' elif len(request_data['failed_tasks']) > 0 and all_tasks_finished: request_status = 'completed_with_errors' diff --git a/turbinia/task_manager.py b/turbinia/task_manager.py index cefaac308..67347c845 100644 --- a/turbinia/task_manager.py +++ b/turbinia/task_manager.py @@ -713,7 +713,11 @@ def process_tasks(self): elif celery_task.status == celery_states.PENDING: task.status = 'pending' check_timeout = True - log.debug(f'Task {celery_task.id:s} status pending.') + log.debug(f'Task {celery_task.id:s} is pending.') + elif celery_task.status == celery_states.RECEIVED: + task.status = 'queued' + check_timeout = True + log.debug(f'Task {celery_task.id:s} is queued.') elif celery_task.status == celery_states.REVOKED: message = ( f'Celery task {celery_task.id:s} associated with Turbinia ' diff --git a/turbinia/workers/__init__.py b/turbinia/workers/__init__.py index f745ee9e2..a30572b8a 100644 --- a/turbinia/workers/__init__.py +++ b/turbinia/workers/__init__.py @@ -1039,10 +1039,14 @@ def run_wrapper(self, evidence): log.debug(f'Task {self.name:s} {self.id:s} awaiting execution') failure_message = None + worker_name = platform.node() try: evidence = evidence_decode(evidence) self.result = self.setup(evidence) self.update_task_status(self, 'queued') + task_key = self.state_manager.redis_client.build_key_name('task', self.id) + self.state_manager.redis_client.set_attribute( + task_key, 'worker_name', json.dumps(worker_name)) turbinia_worker_tasks_queued_total.inc() task_runtime_metrics = self.get_metrics() except TurbiniaException as exception: @@ -1098,12 +1102,7 @@ def run_wrapper(self, evidence): self.task_config = self.get_task_recipe(evidence.config) self.worker_start_time = datetime.now() # Update task status so we know which worker the task executed on. - worker_name = platform.node() updated_status = f'Task is running on {worker_name}' - task_key = self.state_manager.redis_client.build_key_name( - 'task', self.id) - self.state_manager.redis_client.set_attribute( - task_key, 'worker_name', json.dumps(worker_name)) self.update_task_status(self, updated_status) self.result = self.run(evidence, self.result) diff --git a/web/src/components/TaskDetails.vue b/web/src/components/TaskDetails.vue index ff1fd1eab..b217c74ca 100644 --- a/web/src/components/TaskDetails.vue +++ b/web/src/components/TaskDetails.vue @@ -164,6 +164,9 @@ limitations under the License.
{{ taskDetails.successful }}
+
+ False +
N/A
diff --git a/web/src/components/TaskList.vue b/web/src/components/TaskList.vue index 1ff848c56..0349a5716 100644 --- a/web/src/components/TaskList.vue +++ b/web/src/components/TaskList.vue @@ -78,6 +78,9 @@ export default { if (taskStatusTemp === null || taskStatusTemp === "pending") { taskStatusTemp = 'is pending on server.' } + else if (taskStatusTemp == "queued") { + taskStatusTemp = 'is queued for execution.' + } if (this.filterJobs.length > 0) { let jobName = task_dict.job_name.toLowerCase() if (this.radioFilter && !this.filterJobs.includes(jobName)) {