Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add C Clef Extraction Job #1186

Merged
merged 4 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 94 additions & 45 deletions rodan-main/code/rodan/jobs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def __new__(cls, clsname, bases, attrs):
module_name = attrs["__module__"]
if module_name.startswith("rodan.jobs."):
attrs["_package_name"] = (
"rodan.jobs." + module_name[len("rodan.jobs."):].split(".", 1)[0]
"rodan.jobs." + module_name[len("rodan.jobs.") :].split(".", 1)[0]
)
else:
if settings.TEST and module_name == "rodan.test.dummy_jobs":
Expand Down Expand Up @@ -102,12 +102,14 @@ def __init__(cls, clsname, bases, attrs):
if attrs.get("_abstract") is True:
return
else:

# Set base settings schema if they do not already exist in the job.
schema = attrs.get("settings", {"job_queue": "celery", "type": "object"})

if not Job.objects.filter(name=attrs["name"]).exists():
if (not getattr(settings, "_update_rodan_jobs", None) and not settings.TEST):
if (
not getattr(settings, "_update_rodan_jobs", None)
and not settings.TEST
):
raise ImproperlyConfigured(
(
"The catalogue of local jobs does not match the ones in "
Expand Down Expand Up @@ -135,7 +137,7 @@ def __init__(cls, clsname, bases, attrs):
job_queue=schema.get("job_queue", "celery"),
)
j.save()
#print(attrs["name"])
# print(attrs["name"])

try:
for ipt in attrs["input_port_types"]:
Expand Down Expand Up @@ -387,9 +389,18 @@ def check_port_types(which):
resource_types = RodanTaskType._resolve_resource_types(
attrs_pt["resource_types"]
)
rt_code = set(list(map(lambda rt: rt.mimetype, resource_types))) #map works differently in py2->3, need to add list
rt_code = set(
list(map(lambda rt: rt.mimetype, resource_types))
) # map works differently in py2->3, need to add list
rt_db = set(
list((map(lambda rt: rt.mimetype, pt.resource_types.all())))
list(
(
map(
lambda rt: rt.mimetype,
pt.resource_types.all(),
)
)
)
)
if rt_code != rt_db:
if not UPDATE_JOBS:
Expand Down Expand Up @@ -489,8 +500,10 @@ def check_port_types(which):
is_list=bool(pt.get("is_list", False)),
)
i.save()
resource_types = RodanTaskType._resolve_resource_types(
pt["resource_types"]
resource_types = (
RodanTaskType._resolve_resource_types(
pt["resource_types"]
)
)
if len(resource_types) == 0:
raise ValueError(
Expand Down Expand Up @@ -523,15 +536,17 @@ def _resolve_resource_types(value):
Returns a list of ResourceType objects.
"""
try:
mimelist = list(filter(
value, ResourceType.objects.all().values_list("mimetype", flat=True)
))
mimelist = list(
filter(
value, ResourceType.objects.all().values_list("mimetype", flat=True)
)
)
except TypeError:
mimelist = value
return ResourceType.objects.filter(mimetype__in=mimelist)


class RodanTask(Task,metaclass=RodanTaskType):
class RodanTask(Task, metaclass=RodanTaskType):
# __metaclass__ = RodanTaskType
abstract = True

Expand All @@ -552,6 +567,7 @@ def _extract_resource(resource, resource_type_mimetype=None):
"resource_type": str(
resource_type_mimetype or resource.resource_type.mimetype
),
"resource_name": str(resource.name),
}
if with_urls:
r["resource_url"] = str(resource.resource_url)
Expand All @@ -575,12 +591,14 @@ def _extract_resource(resource, resource_type_mimetype=None):
inputs[ipt_name].append(_extract_resource(input.resource))
elif input.resource_list is not None: # If resource_list
inputs[ipt_name].append(
list(map(
lambda x: _extract_resource(
x, input.resource_list.get_resource_type().mimetype
),
input.resource_list.resources.all(),
))
list(
map(
lambda x: _extract_resource(
x, input.resource_list.get_resource_type().mimetype
),
input.resource_list.resources.all(),
)
)
)
else:
raise RuntimeError(
Expand Down Expand Up @@ -694,8 +712,8 @@ def __init__(self, settings_update={}, response=None):
for k, v in settings_update.items():
if isinstance(k, str) and k.startswith("@"): # noqa
self.settings_update[k] = v
# this is not throwing error in rodan for python3

# this is not throwing error in rodan for python3

def tempdir(self):
"""
Expand Down Expand Up @@ -774,16 +792,24 @@ def run(self, runjob_id):
logger.info(("ran the task and the returned object is {0}").format(retval))

if isinstance(retval, self.WAITING_FOR_INPUT):
logger.info(("the settings_update field is: {0}").format(retval.settings_update))
logger.info(
("the settings_update field is: {0}").format(retval.settings_update)
)
try:
if type(retval.settings_update["@settings"]) == bytes:
retval.settings_update["@settings"] = retval.settings_update["@settings"].decode("UTF-8")
retval.settings_update["@settings"] = retval.settings_update[
"@settings"
].decode("UTF-8")
except KeyError:
pass
settings.update(retval.settings_update)
logger.info(("After being updated the settings_update field is: {0}").format(retval.settings_update))
logger.info(
("After being updated the settings_update field is: {0}").format(
retval.settings_update
)
)

# for python3 we have to use decode utf 8 for jason format
# for python3 we have to use decode utf 8 for jason format
# for the last step of the biollante job
# first iteration the updated version is the same as the initial version
# encoded again? biollante is working?
Expand Down Expand Up @@ -817,9 +843,16 @@ def run(self, runjob_id):
and user.user_preference.send_email
):
to = [user.email]
email_template = "emails/workflow_run_waiting_for_user_input.html"
context = {"name": workflow_run.name, "description": workflow_run.description}
registry.tasks["rodan.core.send_templated_email"].apply_async((to, email_template, context))
email_template = (
"emails/workflow_run_waiting_for_user_input.html"
)
context = {
"name": workflow_run.name,
"description": workflow_run.description,
}
registry.tasks["rodan.core.send_templated_email"].apply_async(
(to, email_template, context)
)

return "WAITING FOR INPUT"
else:
Expand Down Expand Up @@ -938,45 +971,56 @@ def run(self, runjob_id):
# Update workflow run description with job info
wall_time = time.time() - start_time
try:
snapshot_info = "\n\n{0}:\n name: \"{1}\"\n wall_time: \"{2}\"\n".format(
str(runjob.uuid),
runjob.job_name,
time.strftime("%H:%M:%S", time.gmtime(wall_time))
snapshot_info = (
'\n\n{0}:\n name: "{1}"\n wall_time: "{2}"\n'.format(
str(runjob.uuid),
runjob.job_name,
time.strftime("%H:%M:%S", time.gmtime(wall_time)),
)
)

if len(settings) > 0:
snapshot_info += " settings:\n"
for key, value in settings.iteritems():
snapshot_info += " {0}: {1}\n".format(str(key), str(value))
snapshot_info += " {0}: {1}\n".format(
str(key), str(value)
)

input_qs = Input.objects.filter(run_job=runjob)
if input_qs.count() > 0:
snapshot_info += " inputs:\n"
for input in input_qs:
snapshot_info += " - uuid: {0}\n" \
.format(str(input.resource.uuid))
snapshot_info += " name: \"{0}\"\n" \
.format(input.resource.name)
snapshot_info += " - uuid: {0}\n".format(
str(input.resource.uuid)
)
snapshot_info += ' name: "{0}"\n'.format(
input.resource.name
)

output_qs = Output.objects.filter(run_job=runjob)
if output_qs.count() > 0:
snapshot_info += " outputs:\n"
for output in Output.objects.filter(run_job=runjob):
snapshot_info += " - uuid: {0}\n" \
.format(str(output.resource.uuid))
snapshot_info += " name: \"{0}\"\n" \
.format(input.resource.name)
snapshot_info += " - uuid: {0}\n".format(
str(output.resource.uuid)
)
snapshot_info += ' name: "{0}"\n'.format(
input.resource.name
)

snapshot_info += "\n"

with transaction.atomic():
atomic_wfrun = WorkflowRun.objects.select_for_update() \
.get(uuid=runjob.workflow_run.uuid)
atomic_wfrun = WorkflowRun.objects.select_for_update().get(
uuid=runjob.workflow_run.uuid
)
if atomic_wfrun.description is None:
atomic_wfrun.description = ""
atomic_wfrun.description += snapshot_info
atomic_wfrun.save(update_fields=["description"])
except AttributeError: # This happens during tests where not all fields are set
except (
AttributeError
): # This happens during tests where not all fields are set
pass
except Exception as e:
print(e)
Expand Down Expand Up @@ -1016,8 +1060,13 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
):
to = [user.email]
email_template = "rodan/email/workflow_run_failed.html"
context = { "name": workflow_run.name, "description": workflow_run.description}
registry.tasks["rodan.core.send_templated_email"].apply_async((to, email_template, context))
context = {
"name": workflow_run.name,
"description": workflow_run.description,
}
registry.tasks["rodan.core.send_templated_email"].apply_async(
(to, email_template, context)
)

def _add_error_information_to_runjob(self, exc, einfo):
# Any job using the default_on_failure method can define an error_information
Expand Down
4 changes: 4 additions & 0 deletions rodan-main/code/rodan/jobs/extract_c_clefs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
__version__ = "1.0.0"
from rodan.jobs import module_loader

module_loader("rodan.jobs.extract_c_clefs.base")
67 changes: 67 additions & 0 deletions rodan-main/code/rodan/jobs/extract_c_clefs/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from rodan.jobs.base import RodanTask
from .extract_c_clefs import *
import cv2 as cv
import os
import json
import logging

logger = logging.getLogger("rodan")


class ExtractCClefs(RodanTask):
name = "Extract C Clefs"
author = "Lucas March"
description = "Finds the C clefs from a generated XML file from the interactive classifier and exports them to seperate images."
enabled = True
category = "Image Processing"
interactive = False
settings = {
"title": "Settings",
"type": "object",
"job_queue": "Python3",
}
input_port_types = (
{
"name": "PNG Image",
"minimum": 1,
"maximum": 1,
"resource_types": ["image/rgba+png"],
},
{
"name": "XML file",
"minimum": 1,
"maximum": 1,
"resource_types": ["application/gamera+xml"],
},
)
output_port_types = (
{
"name": "C Clef",
"minimum": 1,
"maximum": 20,
"is_list": True,
"resource_types": ["image/rgba+png"],
},
)

def run_my_task(self, inputs, settings, outputs):
logger.info("Running C Clef Extraction")

image_path = inputs["PNG Image"][0]["resource_path"]
image_name = inputs["PNG Image"][0]["resource_name"]
xml_path = inputs["XML file"][0]["resource_path"]

image = load_image(image_path)
xml = load_xml(xml_path)
coords = extract_coords(xml)
if not coords:
raise Exception("No C Clefs found in XML File.")
cropped_images = crop_images(image, coords)
output_base_path = outputs["C Clef"][0]["resource_folder"]
logger.info(f"output base path {output_base_path}")
for i, cropped_image in enumerate(cropped_images):
index = i + 1 # Start indexing from 1
output_path = f"{output_base_path}{image_name}_{index}.png"
save_image(cropped_image, output_path)

return True
Loading
Loading