Skip to content

Commit

Permalink
Merge branch 'main' into batch_endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Dec 9, 2024
2 parents b57619d + b7f3f31 commit 5c76025
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 58 deletions.
1 change: 1 addition & 0 deletions src/server/operandi_server/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class WorkflowArguments(BaseModel):
workspace_id: str
input_file_grp: Optional[str] = DEFAULT_FILE_GRP
remove_file_grps: Optional[str] = ""
preserve_file_grps: Optional[str] = ""
mets_name: Optional[str] = DEFAULT_METS_BASENAME

class Config:
Expand Down
2 changes: 2 additions & 0 deletions src/server/operandi_server/models/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class WorkflowRsrc(Resource):
# datetime: (datetime) - inherited from Resource
uses_mets_server: bool
executable_steps: List[str]
producible_file_groups: List[str]

class Config:
allow_population_by_field_name = True
Expand All @@ -29,6 +30,7 @@ def from_db_workflow(db_workflow: DBWorkflow):
description=db_workflow.details,
uses_mets_server=db_workflow.uses_mets_server,
executable_steps=db_workflow.executable_steps,
producible_file_groups=db_workflow.producible_file_groups,
datetime=db_workflow.datetime,
)

Expand Down
53 changes: 36 additions & 17 deletions src/server/operandi_server/routers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
convert_oton_with_handling,
get_db_workflow_job_with_handling,
get_db_workflow_with_handling,
nf_script_executable_steps_with_handling,
nf_script_uses_mets_server_with_handling,
nf_script_extract_metadata_with_handling,
push_status_request_to_rabbitmq,
validate_oton_with_handling
)
from .workspace_utils import check_if_file_group_exists_with_handling, get_db_workspace_with_handling
from .workspace_utils import (
check_if_file_group_exists_with_handling, get_db_workspace_with_handling, find_file_groups_to_remove_with_handling)
from .user_utils import user_auth_with_handling


Expand Down Expand Up @@ -167,15 +167,15 @@ async def insert_production_workflows(self, production_nf_wfs_dir: Path = get_nf
SERVER_WORKFLOWS_ROUTER, resource_id=path.stem, exists_ok=True)
nf_script_dest = join(workflow_dir, path.name)
copyfile(src=path, dst=nf_script_dest)
uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest)
executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest)
nf_metadata = await nf_script_extract_metadata_with_handling(self.logger, nf_script_dest)
self.logger.info(
f"Inserting: {workflow_id}, uses_mets_server: {uses_mets_server}, script path: {nf_script_dest}")
f"Inserting: {workflow_id}, metadata: {nf_metadata}, script path: {nf_script_dest}")
await db_create_workflow(
user_id="Operandi Server",
workflow_id=workflow_id, workflow_dir=workflow_dir, workflow_script_path=nf_script_dest,
workflow_script_base=path.name, uses_mets_server=uses_mets_server, executable_steps=executable_steps,
details=wf_detail)
workflow_script_base=path.name, uses_mets_server=nf_metadata["uses_mets_server"],
executable_steps=nf_metadata["executable_steps"],
producible_file_groups=nf_metadata["producible_file_groups"], details=wf_detail)
self.production_workflows.append(workflow_id)

async def list_workflows(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> List[WorkflowRsrc]:
Expand Down Expand Up @@ -224,12 +224,12 @@ async def upload_workflow_script(
message = "Failed to receive the workflow resource"
self.logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest)
executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest)
nf_metadata = await nf_script_extract_metadata_with_handling(self.logger, nf_script_dest)
db_workflow = await db_create_workflow(
user_id=py_user_action.user_id, workflow_id=workflow_id, workflow_dir=workflow_dir,
workflow_script_path=nf_script_dest, workflow_script_base=nextflow_script.filename,
uses_mets_server=uses_mets_server, executable_steps=executable_steps, details=details)
uses_mets_server=nf_metadata["uses_mets_server"], executable_steps=nf_metadata["executable_steps"],
producible_file_groups=nf_metadata["producible_file_groups"], details=details)
return WorkflowRsrc.from_db_workflow(db_workflow)

async def update_workflow_script(
Expand Down Expand Up @@ -260,12 +260,12 @@ async def update_workflow_script(
message = f"Failed to receive the workflow resource"
self.logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest)
executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest)
nf_metadata = await nf_script_extract_metadata_with_handling(self.logger, nf_script_dest)
db_workflow = await db_create_workflow(
user_id=py_user_action.user_id, workflow_id=workflow_id, workflow_dir=workflow_dir,
workflow_script_path=nf_script_dest, workflow_script_base=nextflow_script.filename,
uses_mets_server=uses_mets_server, executable_steps=executable_steps, details=details)
uses_mets_server=nf_metadata["uses_mets_server"], executable_steps=nf_metadata["executable_steps"],
producible_file_groups=nf_metadata["producible_file_groups"], details=details)
return WorkflowRsrc.from_db_workflow(db_workflow)

async def get_workflow_job_status(
Expand Down Expand Up @@ -377,14 +377,20 @@ async def submit_to_rabbitmq_queue(
try:
workspace_id = workflow_args.workspace_id
input_file_grp = workflow_args.input_file_grp
# TODO: Verify if the file groups requested to be removed are in fact
# TODO: Verify if the file groups requested to be removed/preserved are in fact
# going to be produced in the future by the used workflow
remove_file_grps = workflow_args.remove_file_grps
preserve_file_grps = workflow_args.preserve_file_grps
except Exception as error:
message = "Failed to parse workflow arguments"
self.logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)

if remove_file_grps and preserve_file_grps:
message = "`remove_file_grps` and `preserve_file_grps` fields are mutually exclusive. Provide only one."
self.logger.error(f"{message}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)

# Check the availability and readiness of the workspace to be used
db_workspace = await get_db_workspace_with_handling(self.logger, workspace_id=workspace_id)
if not check_if_file_group_exists_with_handling(self.logger, db_workspace, input_file_grp):
Expand All @@ -394,6 +400,19 @@ async def submit_to_rabbitmq_queue(

# Check the availability of the workflow to be used
db_workflow = await get_db_workflow_with_handling(self.logger, workflow_id=workflow_id)
if preserve_file_grps:
self.logger.info(f"Finding file groups to be removed based on the reproducible/preserve file groups")
remove_file_grps = find_file_groups_to_remove_with_handling(self.logger, db_workspace, preserve_file_grps)
remove_file_grps = ",".join(remove_file_grps)
self.logger.info(f"remove_file_grps: {remove_file_grps}")
list_preserver_file_grps = preserve_file_grps.split(',')
remove_file_grps_reproducible = []
for file_group in db_workflow.producible_file_groups:
if file_group not in list_preserver_file_grps:
remove_file_grps_reproducible.append(file_group)
if len(remove_file_grps_reproducible) > 0:
remove_file_grps += f",{','.join(remove_file_grps_reproducible)}"
self.logger.info(f"remove_file_grps including reproducible: {remove_file_grps}")

try:
# Create job request parameters
Expand Down Expand Up @@ -423,8 +442,8 @@ async def submit_to_rabbitmq_queue(

self._push_job_to_rabbitmq(
user_id=py_user_action.user_id, user_type=user_account_type, workflow_id=workflow_id,
workspace_id=workspace_id, job_id=job_id, input_file_grp=input_file_grp,
remove_file_grps=remove_file_grps, partition=partition, cpus=cpus, ram=ram
workspace_id=workspace_id, job_id=job_id, input_file_grp=input_file_grp, remove_file_grps=remove_file_grps,
partition=partition, cpus=cpus, ram=ram
)
await db_increase_processing_stats_with_handling(
self.logger, find_user_id=py_user_action.user_id, pages_submitted=db_workspace.pages_amount)
Expand Down
50 changes: 17 additions & 33 deletions src/server/operandi_server/routers/workflow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,51 +49,35 @@ async def get_db_workflow_job_with_handling(logger, job_id: str, check_local_exi
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
return db_workflow_job

async def nf_script_uses_mets_server_with_handling(
logger, nf_script_path: str, search_string: str = PARAMS_KEY_METS_SOCKET_PATH
) -> bool:
try:
with open(nf_script_path) as nf_file:
line = nf_file.readline()
while line:
if search_string in line:
return True
line = nf_file.readline()
return False
except Exception as error:
message = "Failed to identify whether a mets server is used or not in the provided Nextflow workflow."
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)

async def nf_script_executable_steps_with_handling(logger, nf_script_path: str) -> List[str]:
# TODO: Find a way to simplify that potentially by getting the metadata from OtoN directly
# However, what about user defined workflows then?
async def nf_script_extract_metadata_with_handling(logger, nf_script_path: str) -> dict:
metadata = {"uses_mets_server": False}
processor_executables: List[str] = []
file_groups: List[str] = []
try:
with open(nf_script_path) as nf_file:
line = nf_file.readline()
while line:
for word in line.split(' '):
if not metadata["uses_mets_server"] and PARAMS_KEY_METS_SOCKET_PATH in line:
metadata["uses_mets_server"] = True
edited_line = line.replace(")\n", "")
for word in edited_line.split(' '):
if "ocrd-" in word:
processor_executables.append(word)
break
if word.startswith('"') and word.endswith('"'):
file_groups.append(word[1:-1])
break
line = nf_file.readline()
metadata["executable_steps"] = processor_executables
metadata["producible_file_groups"] = file_groups
except Exception as error:
message = "Failed to identify processor executables in the provided Nextflow workflow."
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)

"""
apptainer_images: List[str] = []
try:
for executable in processor_executables:
apptainer_images.append(OCRD_PROCESSOR_EXECUTABLE_TO_IMAGE[executable])
except Exception as error:
message = "Failed to produce apptainer image names from the processor executables list"
message = "Failed to extract the metadata of the provided Nextflow workflow."
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)
return apptainer_images
"""
logger.info(f"Found processor executables: {processor_executables}")
return processor_executables
logger.info(f"Extracted Nextflow workflow metadata: {metadata}")
return metadata

async def validate_oton_with_handling(logger, ocrd_process_txt_path: str):
try:
Expand Down
21 changes: 16 additions & 5 deletions src/server/operandi_server/routers/workspace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def create_workspace_bag(db_workspace) -> Union[str, None]:
workspace, ocrd_identifier=db_workspace.ocrd_identifier, dest=bag_dest, ocrd_mets=mets_basename, processes=1)
return bag_dest


def create_workspace_bag_from_remote_url(
mets_url: str, workspace_id: str, bag_dest: str, mets_basename: str = DEFAULT_METS_BASENAME,
preserve_file_grps: List[str] = None
Expand Down Expand Up @@ -188,10 +187,9 @@ def remove_file_groups_with_handling(
logger, db_workspace, file_groups: List[str], recursive: bool = True, force: bool = True
) -> List[str]:
try:
resolver = Resolver()
# Create an OCR-D Workspace from a remote mets URL
# without downloading the files referenced in the mets file
workspace = resolver.workspace_from_url(
workspace = Resolver().workspace_from_url(
mets_url=db_workspace.workspace_mets_path, clobber_mets=False, mets_basename=db_workspace.mets_basename,
download=False)
for file_group in file_groups:
Expand All @@ -205,8 +203,7 @@ def remove_file_groups_with_handling(

def extract_file_groups_from_db_model_with_handling(logger, db_workspace) -> List[str]:
try:
resolver = Resolver()
workspace = resolver.workspace_from_url(
workspace = Resolver().workspace_from_url(
mets_url=db_workspace.workspace_mets_path, clobber_mets=False, mets_basename=db_workspace.mets_basename,
download=False)
return workspace.mets.file_groups
Expand All @@ -219,8 +216,22 @@ def check_if_file_group_exists_with_handling(logger, db_workspace, file_group: s
file_groups = extract_file_groups_from_db_model_with_handling(logger, db_workspace)
return file_group in file_groups

def find_file_groups_to_remove_with_handling(logger, db_workspace, preserve_file_grps: str) -> List[str]:
preserve_file_grps: List[str] = preserve_file_grps.split(',')
try:
workspace = Resolver().workspace_from_url(
mets_url=db_workspace.workspace_mets_path, clobber_mets=False, mets_basename=db_workspace.mets_basename,
download=False)
remove_groups = [x for x in workspace.mets.file_groups if x not in preserve_file_grps]
except Exception as error:
message = "Failed to find file groups to be removed based on the file groups to be preserved"
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=message)
return remove_groups

async def get_user_workspaces(
user_id: str, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None
) -> List[WorkspaceRsrc]:
db_workspaces = await db_get_all_workspaces_by_user(user_id=user_id, start_date=start_date, end_date=end_date)
return [WorkspaceRsrc.from_db_workspace(db_workspace) for db_workspace in db_workspaces]

10 changes: 7 additions & 3 deletions src/utils/operandi_utils/database/db_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# TODO: This also updates to satisfy the PUT method in the Workflow Manager - fix this
async def db_create_workflow(
user_id: str, workflow_id: str, workflow_dir: str, workflow_script_base: str, workflow_script_path: str,
uses_mets_server: bool, executable_steps: List[str], details: str = "Workflow"
uses_mets_server: bool, executable_steps: List[str], producible_file_groups: List[str], details: str = "Workflow"
) -> DBWorkflow:
try:
db_workflow = await db_get_workflow(workflow_id)
Expand All @@ -20,6 +20,7 @@ async def db_create_workflow(
workflow_script_path=workflow_script_path,
uses_mets_server=uses_mets_server,
executable_steps=executable_steps,
producible_file_groups=producible_file_groups,
datetime=datetime.now(),
details=details
)
Expand All @@ -31,6 +32,7 @@ async def db_create_workflow(
db_workflow.workflow_script_path = workflow_script_path
db_workflow.uses_mets_server = uses_mets_server
db_workflow.executable_steps = executable_steps
db_workflow.producible_file_groups = producible_file_groups
db_workflow.details = details
await db_workflow.save()
return db_workflow
Expand All @@ -39,11 +41,11 @@ async def db_create_workflow(
@call_sync
async def sync_db_create_workflow(
user_id: str, workflow_id: str, workflow_dir: str, workflow_script_base: str, workflow_script_path: str,
uses_mets_server: bool, executable_steps: List[str], details: str = "Workflow"
uses_mets_server: bool, executable_steps: List[str], producible_file_groups: List[str], details: str = "Workflow"
) -> DBWorkflow:
return await db_create_workflow(
user_id, workflow_id, workflow_dir, workflow_script_base, workflow_script_path, uses_mets_server,
executable_steps, details)
executable_steps, producible_file_groups, details)


async def db_get_workflow(workflow_id: str) -> DBWorkflow:
Expand Down Expand Up @@ -88,6 +90,8 @@ async def db_update_workflow(find_workflow_id: str, **kwargs) -> DBWorkflow:
db_workflow.uses_mets_server = value
elif key == "executable_steps":
db_workflow.executable_steps = value
elif key == "producible_file_groups":
db_workflow.producible_file_groups = value
elif key == "deleted":
db_workflow.deleted = value
elif key == "details":
Expand Down
2 changes: 2 additions & 0 deletions src/utils/operandi_utils/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class DBWorkflow(Document):
workflow_script_path Nextflow workflow file full path on the server
uses_mets_server Whether the NF script forwards requests to a workspace mets server
executable_steps A list of ocrd_processor executables
producible_file_groups A list of file groups that will be produced after executing the current workflow
deleted Whether the entry has been deleted locally from the server
datetime Shows the created date time of the entry
details Extra user specified details about this entry
Expand All @@ -114,6 +115,7 @@ class DBWorkflow(Document):
workflow_script_path: str
uses_mets_server: bool
executable_steps: List[str]
producible_file_groups: List[str]
deleted: bool = False
datetime = datetime.now()
details: Optional[str]
Expand Down

0 comments on commit 5c76025

Please sign in to comment.