Skip to content

Commit

Permalink
feat(jobs): add resource name to extracted information for job resource
Browse files Browse the repository at this point in the history
- add `resource.name` to `_extract_resource`
- black formatting changes to `base.py` file
  • Loading branch information
lucasmarchd01 committed Jun 28, 2024
1 parent 8373467 commit f1b0042
Showing 1 changed file with 94 additions and 45 deletions.
139 changes: 94 additions & 45 deletions rodan-main/code/rodan/jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __new__(cls, clsname, bases, attrs):
module_name = attrs["__module__"]
if module_name.startswith("rodan.jobs."):
attrs["_package_name"] = (
"rodan.jobs." + module_name[len("rodan.jobs."):].split(".", 1)[0]
"rodan.jobs." + module_name[len("rodan.jobs.") :].split(".", 1)[0]
)
else:
if settings.TEST and module_name == "rodan.test.dummy_jobs":
Expand Down Expand Up @@ -102,12 +102,14 @@ def __init__(cls, clsname, bases, attrs):
if attrs.get("_abstract") is True:
return
else:

# Set base settings schema if they do not already exist in the job.
schema = attrs.get("settings", {"job_queue": "celery", "type": "object"})

if not Job.objects.filter(name=attrs["name"]).exists():
if (not getattr(settings, "_update_rodan_jobs", None) and not settings.TEST):
if (
not getattr(settings, "_update_rodan_jobs", None)
and not settings.TEST
):
raise ImproperlyConfigured(
(
"The catalogue of local jobs does not match the ones in "
Expand Down Expand Up @@ -135,7 +137,7 @@ def __init__(cls, clsname, bases, attrs):
job_queue=schema.get("job_queue", "celery"),
)
j.save()
#print(attrs["name"])
# print(attrs["name"])

try:
for ipt in attrs["input_port_types"]:
Expand Down Expand Up @@ -387,9 +389,18 @@ def check_port_types(which):
resource_types = RodanTaskType._resolve_resource_types(
attrs_pt["resource_types"]
)
rt_code = set(list(map(lambda rt: rt.mimetype, resource_types))) #map works differently in py2->3, need to add list
rt_code = set(
list(map(lambda rt: rt.mimetype, resource_types))
) # map works differently in py2->3, need to add list
rt_db = set(
list((map(lambda rt: rt.mimetype, pt.resource_types.all())))
list(
(
map(
lambda rt: rt.mimetype,
pt.resource_types.all(),
)
)
)
)
if rt_code != rt_db:
if not UPDATE_JOBS:
Expand Down Expand Up @@ -489,8 +500,10 @@ def check_port_types(which):
is_list=bool(pt.get("is_list", False)),
)
i.save()
resource_types = RodanTaskType._resolve_resource_types(
pt["resource_types"]
resource_types = (
RodanTaskType._resolve_resource_types(
pt["resource_types"]
)
)
if len(resource_types) == 0:
raise ValueError(
Expand Down Expand Up @@ -523,15 +536,17 @@ def _resolve_resource_types(value):
Returns a list of ResourceType objects.
"""
try:
mimelist = list(filter(
value, ResourceType.objects.all().values_list("mimetype", flat=True)
))
mimelist = list(
filter(
value, ResourceType.objects.all().values_list("mimetype", flat=True)
)
)
except TypeError:
mimelist = value
return ResourceType.objects.filter(mimetype__in=mimelist)


class RodanTask(Task,metaclass=RodanTaskType):
class RodanTask(Task, metaclass=RodanTaskType):
# __metaclass__ = RodanTaskType
abstract = True

Expand All @@ -552,6 +567,7 @@ def _extract_resource(resource, resource_type_mimetype=None):
"resource_type": str(
resource_type_mimetype or resource.resource_type.mimetype
),
"resource_name": str(resource.name),
}
if with_urls:
r["resource_url"] = str(resource.resource_url)
Expand All @@ -575,12 +591,14 @@ def _extract_resource(resource, resource_type_mimetype=None):
inputs[ipt_name].append(_extract_resource(input.resource))
elif input.resource_list is not None: # If resource_list
inputs[ipt_name].append(
list(map(
lambda x: _extract_resource(
x, input.resource_list.get_resource_type().mimetype
),
input.resource_list.resources.all(),
))
list(
map(
lambda x: _extract_resource(
x, input.resource_list.get_resource_type().mimetype
),
input.resource_list.resources.all(),
)
)
)
else:
raise RuntimeError(
Expand Down Expand Up @@ -694,8 +712,8 @@ def __init__(self, settings_update={}, response=None):
for k, v in settings_update.items():
if isinstance(k, str) and k.startswith("@"): # noqa
self.settings_update[k] = v
# this is not throwing error in rodan for python3

# this is not throwing error in rodan for python3

def tempdir(self):
"""
Expand Down Expand Up @@ -774,16 +792,24 @@ def run(self, runjob_id):
logger.info(("ran the task and the returned object is {0}").format(retval))

if isinstance(retval, self.WAITING_FOR_INPUT):
logger.info(("the settings_update field is: {0}").format(retval.settings_update))
logger.info(
("the settings_update field is: {0}").format(retval.settings_update)
)
try:
if type(retval.settings_update["@settings"]) == bytes:
retval.settings_update["@settings"] = retval.settings_update["@settings"].decode("UTF-8")
retval.settings_update["@settings"] = retval.settings_update[
"@settings"
].decode("UTF-8")
except KeyError:
pass
settings.update(retval.settings_update)
logger.info(("After being updated the settings_update field is: {0}").format(retval.settings_update))
logger.info(
("After being updated the settings_update field is: {0}").format(
retval.settings_update
)
)

# for python3 we have to use decode utf 8 for jason format
# for python3 we have to use decode utf 8 for jason format
# for the last step of the biollante job
# first iteration the updated version is the same as the initial version
# encoded again? biollante is working?
Expand Down Expand Up @@ -817,9 +843,16 @@ def run(self, runjob_id):
and user.user_preference.send_email
):
to = [user.email]
email_template = "emails/workflow_run_waiting_for_user_input.html"
context = {"name": workflow_run.name, "description": workflow_run.description}
registry.tasks["rodan.core.send_templated_email"].apply_async((to, email_template, context))
email_template = (
"emails/workflow_run_waiting_for_user_input.html"
)
context = {
"name": workflow_run.name,
"description": workflow_run.description,
}
registry.tasks["rodan.core.send_templated_email"].apply_async(
(to, email_template, context)
)

return "WAITING FOR INPUT"
else:
Expand Down Expand Up @@ -938,45 +971,56 @@ def run(self, runjob_id):
# Update workflow run description with job info
wall_time = time.time() - start_time
try:
snapshot_info = "\n\n{0}:\n name: \"{1}\"\n wall_time: \"{2}\"\n".format(
str(runjob.uuid),
runjob.job_name,
time.strftime("%H:%M:%S", time.gmtime(wall_time))
snapshot_info = (
'\n\n{0}:\n name: "{1}"\n wall_time: "{2}"\n'.format(
str(runjob.uuid),
runjob.job_name,
time.strftime("%H:%M:%S", time.gmtime(wall_time)),
)
)

if len(settings) > 0:
snapshot_info += " settings:\n"
for key, value in settings.iteritems():
snapshot_info += " {0}: {1}\n".format(str(key), str(value))
snapshot_info += " {0}: {1}\n".format(
str(key), str(value)
)

input_qs = Input.objects.filter(run_job=runjob)
if input_qs.count() > 0:
snapshot_info += " inputs:\n"
for input in input_qs:
snapshot_info += " - uuid: {0}\n" \
.format(str(input.resource.uuid))
snapshot_info += " name: \"{0}\"\n" \
.format(input.resource.name)
snapshot_info += " - uuid: {0}\n".format(
str(input.resource.uuid)
)
snapshot_info += ' name: "{0}"\n'.format(
input.resource.name
)

output_qs = Output.objects.filter(run_job=runjob)
if output_qs.count() > 0:
snapshot_info += " outputs:\n"
for output in Output.objects.filter(run_job=runjob):
snapshot_info += " - uuid: {0}\n" \
.format(str(output.resource.uuid))
snapshot_info += " name: \"{0}\"\n" \
.format(input.resource.name)
snapshot_info += " - uuid: {0}\n".format(
str(output.resource.uuid)
)
snapshot_info += ' name: "{0}"\n'.format(
input.resource.name
)

snapshot_info += "\n"

with transaction.atomic():
atomic_wfrun = WorkflowRun.objects.select_for_update() \
.get(uuid=runjob.workflow_run.uuid)
atomic_wfrun = WorkflowRun.objects.select_for_update().get(
uuid=runjob.workflow_run.uuid
)
if atomic_wfrun.description is None:
atomic_wfrun.description = ""
atomic_wfrun.description += snapshot_info
atomic_wfrun.save(update_fields=["description"])
except AttributeError: # This happens during tests where not all fields are set
except (
AttributeError
): # This happens during tests where not all fields are set
pass
except Exception as e:
print(e)
Expand Down Expand Up @@ -1016,8 +1060,13 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
):
to = [user.email]
email_template = "rodan/email/workflow_run_failed.html"
context = { "name": workflow_run.name, "description": workflow_run.description}
registry.tasks["rodan.core.send_templated_email"].apply_async((to, email_template, context))
context = {
"name": workflow_run.name,
"description": workflow_run.description,
}
registry.tasks["rodan.core.send_templated_email"].apply_async(
(to, email_template, context)
)

def _add_error_information_to_runjob(self, exc, einfo):
# Any job using the default_on_failure method can define an error_information
Expand Down

0 comments on commit f1b0042

Please sign in to comment.