diff --git a/rodan-main/code/rodan/jobs/base.py b/rodan-main/code/rodan/jobs/base.py index b859fba2..6ec4e103 100644 --- a/rodan-main/code/rodan/jobs/base.py +++ b/rodan-main/code/rodan/jobs/base.py @@ -66,7 +66,7 @@ def __new__(cls, clsname, bases, attrs): module_name = attrs["__module__"] if module_name.startswith("rodan.jobs."): attrs["_package_name"] = ( - "rodan.jobs." + module_name[len("rodan.jobs."):].split(".", 1)[0] + "rodan.jobs." + module_name[len("rodan.jobs.") :].split(".", 1)[0] ) else: if settings.TEST and module_name == "rodan.test.dummy_jobs": @@ -102,12 +102,14 @@ def __init__(cls, clsname, bases, attrs): if attrs.get("_abstract") is True: return else: - # Set base settings schema if they do not already exist in the job. schema = attrs.get("settings", {"job_queue": "celery", "type": "object"}) if not Job.objects.filter(name=attrs["name"]).exists(): - if (not getattr(settings, "_update_rodan_jobs", None) and not settings.TEST): + if ( + not getattr(settings, "_update_rodan_jobs", None) + and not settings.TEST + ): raise ImproperlyConfigured( ( "The catalogue of local jobs does not match the ones in " @@ -135,7 +137,7 @@ def __init__(cls, clsname, bases, attrs): job_queue=schema.get("job_queue", "celery"), ) j.save() - #print(attrs["name"]) + # print(attrs["name"]) try: for ipt in attrs["input_port_types"]: @@ -387,9 +389,18 @@ def check_port_types(which): resource_types = RodanTaskType._resolve_resource_types( attrs_pt["resource_types"] ) - rt_code = set(list(map(lambda rt: rt.mimetype, resource_types))) #map works differently in py2->3, need to add list + rt_code = set( + list(map(lambda rt: rt.mimetype, resource_types)) + ) # map works differently in py2->3, need to add list rt_db = set( - list((map(lambda rt: rt.mimetype, pt.resource_types.all()))) + list( + ( + map( + lambda rt: rt.mimetype, + pt.resource_types.all(), + ) + ) + ) ) if rt_code != rt_db: if not UPDATE_JOBS: @@ -489,8 +500,10 @@ def check_port_types(which): is_list=bool(pt.get("is_list", False)), ) i.save() - resource_types = RodanTaskType._resolve_resource_types( - pt["resource_types"] + resource_types = ( + RodanTaskType._resolve_resource_types( + pt["resource_types"] + ) ) if len(resource_types) == 0: raise ValueError( @@ -523,15 +536,17 @@ def _resolve_resource_types(value): Returns a list of ResourceType objects. """ try: - mimelist = list(filter( - value, ResourceType.objects.all().values_list("mimetype", flat=True) - )) + mimelist = list( + filter( + value, ResourceType.objects.all().values_list("mimetype", flat=True) + ) + ) except TypeError: mimelist = value return ResourceType.objects.filter(mimetype__in=mimelist) -class RodanTask(Task,metaclass=RodanTaskType): +class RodanTask(Task, metaclass=RodanTaskType): # __metaclass__ = RodanTaskType abstract = True @@ -552,6 +567,7 @@ def _extract_resource(resource, resource_type_mimetype=None): "resource_type": str( resource_type_mimetype or resource.resource_type.mimetype ), + "resource_name": str(resource.name), } if with_urls: r["resource_url"] = str(resource.resource_url) @@ -575,12 +591,14 @@ def _extract_resource(resource, resource_type_mimetype=None): inputs[ipt_name].append(_extract_resource(input.resource)) elif input.resource_list is not None: # If resource_list inputs[ipt_name].append( - list(map( - lambda x: _extract_resource( - x, input.resource_list.get_resource_type().mimetype - ), - input.resource_list.resources.all(), - )) + list( + map( + lambda x: _extract_resource( + x, input.resource_list.get_resource_type().mimetype + ), + input.resource_list.resources.all(), + ) + ) ) else: raise RuntimeError( @@ -694,8 +712,8 @@ def __init__(self, settings_update={}, response=None): for k, v in settings_update.items(): if isinstance(k, str) and k.startswith("@"): # noqa self.settings_update[k] = v - - # this is not throwing error in rodan for python3 + + # this is not throwing error in rodan for python3 def tempdir(self): """ @@ -774,16 +792,24 @@ def run(self, runjob_id): logger.info(("ran the task and the returned object is {0}").format(retval)) if isinstance(retval, self.WAITING_FOR_INPUT): - logger.info(("the settings_update field is: {0}").format(retval.settings_update)) + logger.info( + ("the settings_update field is: {0}").format(retval.settings_update) + ) try: if type(retval.settings_update["@settings"]) == bytes: - retval.settings_update["@settings"] = retval.settings_update["@settings"].decode("UTF-8") + retval.settings_update["@settings"] = retval.settings_update[ + "@settings" + ].decode("UTF-8") except KeyError: pass settings.update(retval.settings_update) - logger.info(("After being updated the settings_update field is: {0}").format(retval.settings_update)) + logger.info( + ("After being updated the settings_update field is: {0}").format( + retval.settings_update + ) + ) - # for python3 we have to use decode utf 8 for jason format + # for python3 we have to use decode utf 8 for jason format # for the last step of the biollante job # first iteration the updated version is the same as the initial version # encoded again? biollante is working? @@ -817,9 +843,16 @@ def run(self, runjob_id): and user.user_preference.send_email ): to = [user.email] - email_template = "emails/workflow_run_waiting_for_user_input.html" - context = {"name": workflow_run.name, "description": workflow_run.description} - registry.tasks["rodan.core.send_templated_email"].apply_async((to, email_template, context)) + email_template = ( + "emails/workflow_run_waiting_for_user_input.html" + ) + context = { + "name": workflow_run.name, + "description": workflow_run.description, + } + registry.tasks["rodan.core.send_templated_email"].apply_async( + (to, email_template, context) + ) return "WAITING FOR INPUT" else: @@ -938,45 +971,56 @@ def run(self, runjob_id): # Update workflow run description with job info wall_time = time.time() - start_time try: - snapshot_info = "\n\n{0}:\n name: \"{1}\"\n wall_time: \"{2}\"\n".format( - str(runjob.uuid), - runjob.job_name, - time.strftime("%H:%M:%S", time.gmtime(wall_time)) + snapshot_info = ( + '\n\n{0}:\n name: "{1}"\n wall_time: "{2}"\n'.format( + str(runjob.uuid), + runjob.job_name, + time.strftime("%H:%M:%S", time.gmtime(wall_time)), + ) ) if len(settings) > 0: snapshot_info += " settings:\n" for key, value in settings.iteritems(): - snapshot_info += " {0}: {1}\n".format(str(key), str(value)) + snapshot_info += " {0}: {1}\n".format( + str(key), str(value) + ) input_qs = Input.objects.filter(run_job=runjob) if input_qs.count() > 0: snapshot_info += " inputs:\n" for input in input_qs: - snapshot_info += " - uuid: {0}\n" \ - .format(str(input.resource.uuid)) - snapshot_info += " name: \"{0}\"\n" \ - .format(input.resource.name) + snapshot_info += " - uuid: {0}\n".format( + str(input.resource.uuid) + ) + snapshot_info += ' name: "{0}"\n'.format( + input.resource.name + ) output_qs = Output.objects.filter(run_job=runjob) if output_qs.count() > 0: snapshot_info += " outputs:\n" for output in Output.objects.filter(run_job=runjob): - snapshot_info += " - uuid: {0}\n" \ - .format(str(output.resource.uuid)) - snapshot_info += " name: \"{0}\"\n" \ - .format(input.resource.name) + snapshot_info += " - uuid: {0}\n".format( + str(output.resource.uuid) + ) + snapshot_info += ' name: "{0}"\n'.format( + input.resource.name + ) snapshot_info += "\n" with transaction.atomic(): - atomic_wfrun = WorkflowRun.objects.select_for_update() \ - .get(uuid=runjob.workflow_run.uuid) + atomic_wfrun = WorkflowRun.objects.select_for_update().get( + uuid=runjob.workflow_run.uuid + ) if atomic_wfrun.description is None: atomic_wfrun.description = "" atomic_wfrun.description += snapshot_info atomic_wfrun.save(update_fields=["description"]) - except AttributeError: # This happens during tests where not all fields are set + except ( + AttributeError + ): # This happens during tests where not all fields are set pass except Exception as e: print(e) @@ -1016,8 +1060,13 @@ def on_failure(self, exc, task_id, args, kwargs, einfo): ): to = [user.email] email_template = "rodan/email/workflow_run_failed.html" - context = { "name": workflow_run.name, "description": workflow_run.description} - registry.tasks["rodan.core.send_templated_email"].apply_async((to, email_template, context)) + context = { + "name": workflow_run.name, + "description": workflow_run.description, + } + registry.tasks["rodan.core.send_templated_email"].apply_async( + (to, email_template, context) + ) def _add_error_information_to_runjob(self, exc, einfo): # Any job using the default_on_failure method can define an error_information