From f5611e8e5db1f438f49a23323ff8b76228056dbf Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Tue, 3 Dec 2024 16:00:20 +0100 Subject: [PATCH 1/3] add: preserve file groups option to workflow jobs --- src/server/operandi_server/models/base.py | 1 + src/server/operandi_server/models/workflow.py | 2 + .../operandi_server/routers/workflow.py | 50 ++++++++++++------- .../operandi_server/routers/workflow_utils.py | 50 +++++++------------ .../routers/workspace_utils.py | 20 ++++++-- .../operandi_utils/database/db_workflow.py | 10 ++-- src/utils/operandi_utils/database/models.py | 2 + 7 files changed, 77 insertions(+), 58 deletions(-) diff --git a/src/server/operandi_server/models/base.py b/src/server/operandi_server/models/base.py index 11bc7a8d..19a37769 100644 --- a/src/server/operandi_server/models/base.py +++ b/src/server/operandi_server/models/base.py @@ -18,6 +18,7 @@ class WorkflowArguments(BaseModel): workspace_id: str input_file_grp: Optional[str] = DEFAULT_FILE_GRP remove_file_grps: Optional[str] = "" + preserve_file_grps: Optional[str] = "" mets_name: Optional[str] = DEFAULT_METS_BASENAME class Config: diff --git a/src/server/operandi_server/models/workflow.py b/src/server/operandi_server/models/workflow.py index 0ea8399c..6d73a5d4 100644 --- a/src/server/operandi_server/models/workflow.py +++ b/src/server/operandi_server/models/workflow.py @@ -16,6 +16,7 @@ class WorkflowRsrc(Resource): # datetime: (datetime) - inherited from Resource uses_mets_server: bool executable_steps: List[str] + producible_file_groups: List[str] class Config: allow_population_by_field_name = True @@ -29,6 +30,7 @@ def from_db_workflow(db_workflow: DBWorkflow): description=db_workflow.details, uses_mets_server=db_workflow.uses_mets_server, executable_steps=db_workflow.executable_steps, + producible_file_groups=db_workflow.producible_file_groups, datetime=db_workflow.datetime, ) diff --git a/src/server/operandi_server/routers/workflow.py b/src/server/operandi_server/routers/workflow.py index f06ca741..2ade44db 100644 --- a/src/server/operandi_server/routers/workflow.py +++ b/src/server/operandi_server/routers/workflow.py @@ -30,10 +30,11 @@ convert_oton_with_handling, get_db_workflow_job_with_handling, get_db_workflow_with_handling, - nf_script_uses_mets_server_with_handling, - validate_oton_with_handling, nf_script_executable_steps_with_handling + nf_script_extract_metadata_with_handling, + validate_oton_with_handling, ) -from .workspace_utils import check_if_file_group_exists_with_handling, get_db_workspace_with_handling +from .workspace_utils import ( + check_if_file_group_exists_with_handling, get_db_workspace_with_handling, find_file_groups_to_remove_with_handling) from .user import RouterUser @@ -174,15 +175,15 @@ async def insert_production_workflows(self, production_nf_wfs_dir: Path = get_nf SERVER_WORKFLOWS_ROUTER, resource_id=path.stem, exists_ok=True) nf_script_dest = join(workflow_dir, path.name) copyfile(src=path, dst=nf_script_dest) - uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest) - executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest) + nf_metadata = await nf_script_extract_metadata_with_handling(self.logger, nf_script_dest) self.logger.info( - f"Inserting: {workflow_id}, uses_mets_server: {uses_mets_server}, script path: {nf_script_dest}") + f"Inserting: {workflow_id}, metadata: {nf_metadata}, script path: {nf_script_dest}") await db_create_workflow( user_id="Operandi Server", workflow_id=workflow_id, workflow_dir=workflow_dir, workflow_script_path=nf_script_dest, - workflow_script_base=path.name, uses_mets_server=uses_mets_server, executable_steps=executable_steps, - details=wf_detail) + workflow_script_base=path.name, uses_mets_server=nf_metadata["uses_mets_server"], + executable_steps=nf_metadata["executable_steps"], + producible_file_groups=nf_metadata["producible_file_groups"], details=wf_detail) self.production_workflows.append(workflow_id) async def list_workflows(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> List[WorkflowRsrc]: @@ -231,12 +232,12 @@ async def upload_workflow_script( message = "Failed to receive the workflow resource" self.logger.error(f"{message}, error: {error}") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) - uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest) - executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest) + nf_metadata = await nf_script_extract_metadata_with_handling(self.logger, nf_script_dest) db_workflow = await db_create_workflow( user_id=py_user_action.user_id, workflow_id=workflow_id, workflow_dir=workflow_dir, workflow_script_path=nf_script_dest, workflow_script_base=nextflow_script.filename, - uses_mets_server=uses_mets_server, executable_steps=executable_steps, details=details) + uses_mets_server=nf_metadata["uses_mets_server"], executable_steps=nf_metadata["executable_steps"], + producible_file_groups=nf_metadata["producible_file_groups"], details=details) return WorkflowRsrc.from_db_workflow(db_workflow) async def update_workflow_script( @@ -267,12 +268,12 @@ async def update_workflow_script( message = f"Failed to receive the workflow resource" self.logger.error(f"{message}, error: {error}") raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message) - uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest) - executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest) + nf_metadata = await nf_script_extract_metadata_with_handling(self.logger, nf_script_dest) db_workflow = await db_create_workflow( user_id=py_user_action.user_id, workflow_id=workflow_id, workflow_dir=workflow_dir, workflow_script_path=nf_script_dest, workflow_script_base=nextflow_script.filename, - uses_mets_server=uses_mets_server, executable_steps=executable_steps, details=details) + uses_mets_server=nf_metadata["uses_mets_server"], executable_steps=nf_metadata["executable_steps"], + producible_file_groups=nf_metadata["producible_file_groups"], details=details) return WorkflowRsrc.from_db_workflow(db_workflow) async def get_workflow_job_status( @@ -384,14 +385,20 @@ async def submit_to_rabbitmq_queue( try: workspace_id = workflow_args.workspace_id input_file_grp = workflow_args.input_file_grp - # TODO: Verify if the file groups requested to be removed are in fact + # TODO: Verify if the file groups requested to be removed/preserved are in fact # going to be produced in the future by the used workflow remove_file_grps = workflow_args.remove_file_grps + preserve_file_grps = workflow_args.preserve_file_grps except Exception as error: message = "Failed to parse workflow arguments" self.logger.error(f"{message}, error: {error}") raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message) + if remove_file_grps and preserve_file_grps: + message = "`remove_file_grps` and `preserve_file_grps` fields are mutually exclusive. Provide only one." + self.logger.error(f"{message}") + raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message) + # Check the availability and readiness of the workspace to be used db_workspace = await get_db_workspace_with_handling(self.logger, workspace_id=workspace_id) if not check_if_file_group_exists_with_handling(self.logger, db_workspace, input_file_grp): @@ -401,6 +408,15 @@ async def submit_to_rabbitmq_queue( # Check the availability of the workflow to be used db_workflow = await get_db_workflow_with_handling(self.logger, workflow_id=workflow_id) + if preserve_file_grps: + self.logger.info(f"Finding file groups to be removed based on the reproducible/preserve file groups") + self.logger.info(f"preserve_file_grps: {preserve_file_grps}") + remove_file_grps = find_file_groups_to_remove_with_handling(self.logger, db_workspace, preserve_file_grps) + remove_file_grps = ",".join(remove_file_grps) + self.logger.info(f"remove_file_grps: {remove_file_grps}") + file_grps_reproducible = ",".join(db_workflow.producible_file_groups) + remove_file_grps += f",{file_grps_reproducible}" + self.logger.info(f"remove_file_grps including reproducible: {remove_file_grps}") try: # Create job request parameters @@ -430,8 +446,8 @@ async def submit_to_rabbitmq_queue( self._push_job_to_rabbitmq( user_id=py_user_action.user_id, user_type=user_account_type, workflow_id=workflow_id, - workspace_id=workspace_id, job_id=job_id, input_file_grp=input_file_grp, - remove_file_grps=remove_file_grps, partition=partition, cpus=cpus, ram=ram + workspace_id=workspace_id, job_id=job_id, input_file_grp=input_file_grp, remove_file_grps=remove_file_grps, + partition=partition, cpus=cpus, ram=ram ) await db_increase_processing_stats_with_handling( self.logger, find_user_id=py_user_action.user_id, pages_submitted=db_workspace.pages_amount) diff --git a/src/server/operandi_server/routers/workflow_utils.py b/src/server/operandi_server/routers/workflow_utils.py index 591798b3..b8bd6a63 100644 --- a/src/server/operandi_server/routers/workflow_utils.py +++ b/src/server/operandi_server/routers/workflow_utils.py @@ -41,51 +41,35 @@ async def get_db_workflow_job_with_handling(logger, job_id: str, check_local_exi raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message) return db_workflow_job -async def nf_script_uses_mets_server_with_handling( - logger, nf_script_path: str, search_string: str = PARAMS_KEY_METS_SOCKET_PATH -) -> bool: - try: - with open(nf_script_path) as nf_file: - line = nf_file.readline() - while line: - if search_string in line: - return True - line = nf_file.readline() - return False - except Exception as error: - message = "Failed to identify whether a mets server is used or not in the provided Nextflow workflow." - logger.error(f"{message}, error: {error}") - raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message) - -async def nf_script_executable_steps_with_handling(logger, nf_script_path: str) -> List[str]: +# TODO: Find a way to simplify that potentially by getting the metadata from OtoN directly +# However, what about user defined workflows then? +async def nf_script_extract_metadata_with_handling(logger, nf_script_path: str) -> dict: + metadata = {"uses_mets_server": False} processor_executables: List[str] = [] + file_groups: List[str] = [] try: with open(nf_script_path) as nf_file: line = nf_file.readline() while line: - for word in line.split(' '): + if not metadata["uses_mets_server"] and PARAMS_KEY_METS_SOCKET_PATH in line: + metadata["uses_mets_server"] = True + edited_line = line.replace(")\n", "") + for word in edited_line.split(' '): if "ocrd-" in word: processor_executables.append(word) break + if word.startswith('"') and word.endswith('"'): + file_groups.append(word[1:-1]) + break line = nf_file.readline() + metadata["executable_steps"] = processor_executables + metadata["producible_file_groups"] = file_groups except Exception as error: - message = "Failed to identify processor executables in the provided Nextflow workflow." - logger.error(f"{message}, error: {error}") - raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message) - - """ - apptainer_images: List[str] = [] - try: - for executable in processor_executables: - apptainer_images.append(OCRD_PROCESSOR_EXECUTABLE_TO_IMAGE[executable]) - except Exception as error: - message = "Failed to produce apptainer image names from the processor executables list" + message = "Failed to extract the metadata of the provided Nextflow workflow." logger.error(f"{message}, error: {error}") raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message) - return apptainer_images - """ - logger.info(f"Found processor executables: {processor_executables}") - return processor_executables + logger.info(f"Extracted Nextflow workflow metadata: {metadata}") + return metadata async def validate_oton_with_handling(logger, ocrd_process_txt_path: str): try: diff --git a/src/server/operandi_server/routers/workspace_utils.py b/src/server/operandi_server/routers/workspace_utils.py index f3ceb85f..da2d700d 100644 --- a/src/server/operandi_server/routers/workspace_utils.py +++ b/src/server/operandi_server/routers/workspace_utils.py @@ -74,7 +74,6 @@ def create_workspace_bag(db_workspace) -> Union[str, None]: workspace, ocrd_identifier=db_workspace.ocrd_identifier, dest=bag_dest, ocrd_mets=mets_basename, processes=1) return bag_dest - def create_workspace_bag_from_remote_url( mets_url: str, workspace_id: str, bag_dest: str, mets_basename: str = DEFAULT_METS_BASENAME, preserve_file_grps: List[str] = None @@ -186,10 +185,9 @@ def remove_file_groups_with_handling( logger, db_workspace, file_groups: List[str], recursive: bool = True, force: bool = True ) -> List[str]: try: - resolver = Resolver() # Create an OCR-D Workspace from a remote mets URL # without downloading the files referenced in the mets file - workspace = resolver.workspace_from_url( + workspace = Resolver().workspace_from_url( mets_url=db_workspace.workspace_mets_path, clobber_mets=False, mets_basename=db_workspace.mets_basename, download=False) for file_group in file_groups: @@ -203,8 +201,7 @@ def remove_file_groups_with_handling( def extract_file_groups_from_db_model_with_handling(logger, db_workspace) -> List[str]: try: - resolver = Resolver() - workspace = resolver.workspace_from_url( + workspace = Resolver().workspace_from_url( mets_url=db_workspace.workspace_mets_path, clobber_mets=False, mets_basename=db_workspace.mets_basename, download=False) return workspace.mets.file_groups @@ -216,3 +213,16 @@ def extract_file_groups_from_db_model_with_handling(logger, db_workspace) -> Lis def check_if_file_group_exists_with_handling(logger, db_workspace, file_group: str) -> bool: file_groups = extract_file_groups_from_db_model_with_handling(logger, db_workspace) return file_group in file_groups + +def find_file_groups_to_remove_with_handling(logger, db_workspace, preserve_file_grps: str) -> List[str]: + preserve_file_grps: List[str] = preserve_file_grps.split(',') + try: + workspace = Resolver().workspace_from_url( + mets_url=db_workspace.workspace_mets_path, clobber_mets=False, mets_basename=db_workspace.mets_basename, + download=False) + remove_groups = [x for x in workspace.mets.file_groups if x not in preserve_file_grps] + except Exception as error: + message = "Failed to find file groups to be removed based on the file groups to be preserved" + logger.error(f"{message}, error: {error}") + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=message) + return remove_groups diff --git a/src/utils/operandi_utils/database/db_workflow.py b/src/utils/operandi_utils/database/db_workflow.py index 78b7f688..c2707424 100644 --- a/src/utils/operandi_utils/database/db_workflow.py +++ b/src/utils/operandi_utils/database/db_workflow.py @@ -7,7 +7,7 @@ # TODO: This also updates to satisfy the PUT method in the Workflow Manager - fix this async def db_create_workflow( user_id: str, workflow_id: str, workflow_dir: str, workflow_script_base: str, workflow_script_path: str, - uses_mets_server: bool, executable_steps: List[str], details: str = "Workflow" + uses_mets_server: bool, executable_steps: List[str], producible_file_groups: List[str], details: str = "Workflow" ) -> DBWorkflow: try: db_workflow = await db_get_workflow(workflow_id) @@ -20,6 +20,7 @@ async def db_create_workflow( workflow_script_path=workflow_script_path, uses_mets_server=uses_mets_server, executable_steps=executable_steps, + producible_file_groups=producible_file_groups, datetime=datetime.now(), details=details ) @@ -31,6 +32,7 @@ async def db_create_workflow( db_workflow.workflow_script_path = workflow_script_path db_workflow.uses_mets_server = uses_mets_server db_workflow.executable_steps = executable_steps + db_workflow.producible_file_groups = producible_file_groups db_workflow.details = details await db_workflow.save() return db_workflow @@ -39,11 +41,11 @@ async def db_create_workflow( @call_sync async def sync_db_create_workflow( user_id: str, workflow_id: str, workflow_dir: str, workflow_script_base: str, workflow_script_path: str, - uses_mets_server: bool, executable_steps: List[str], details: str = "Workflow" + uses_mets_server: bool, executable_steps: List[str], producible_file_groups: List[str], details: str = "Workflow" ) -> DBWorkflow: return await db_create_workflow( user_id, workflow_id, workflow_dir, workflow_script_base, workflow_script_path, uses_mets_server, - executable_steps, details) + executable_steps, producible_file_groups, details) async def db_get_workflow(workflow_id: str) -> DBWorkflow: @@ -88,6 +90,8 @@ async def db_update_workflow(find_workflow_id: str, **kwargs) -> DBWorkflow: db_workflow.uses_mets_server = value elif key == "executable_steps": db_workflow.executable_steps = value + elif key == "producible_file_groups": + db_workflow.producible_file_groups = value elif key == "deleted": db_workflow.deleted = value elif key == "details": diff --git a/src/utils/operandi_utils/database/models.py b/src/utils/operandi_utils/database/models.py index d885df0e..fedf47d0 100644 --- a/src/utils/operandi_utils/database/models.py +++ b/src/utils/operandi_utils/database/models.py @@ -103,6 +103,7 @@ class DBWorkflow(Document): workflow_script_path Nextflow workflow file full path on the server uses_mets_server Whether the NF script forwards requests to a workspace mets server executable_steps A list of ocrd_processor executables + producible_file_groups A list of file groups that will be produced after executing the current workflow deleted Whether the entry has been deleted locally from the server datetime Shows the created date time of the entry details Extra user specified details about this entry @@ -114,6 +115,7 @@ class DBWorkflow(Document): workflow_script_path: str uses_mets_server: bool executable_steps: List[str] + producible_file_groups: List[str] deleted: bool = False datetime = datetime.now() details: Optional[str] From ceb2a78f41e2ceb445caa071ce04b97bb38dddeb Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Fri, 6 Dec 2024 18:17:58 +0100 Subject: [PATCH 2/3] fix: missing imports --- src/server/operandi_server/routers/workflow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/server/operandi_server/routers/workflow.py b/src/server/operandi_server/routers/workflow.py index 48ab0460..02d818f3 100644 --- a/src/server/operandi_server/routers/workflow.py +++ b/src/server/operandi_server/routers/workflow.py @@ -28,9 +28,7 @@ convert_oton_with_handling, get_db_workflow_job_with_handling, get_db_workflow_with_handling, - nf_script_executable_steps_with_handling, nf_script_extract_metadata_with_handling, - nf_script_uses_mets_server_with_handling, push_status_request_to_rabbitmq, validate_oton_with_handling ) From b7f3f31dbb78a82ea5f12e563bf9c5a2a05190f5 Mon Sep 17 00:00:00 2001 From: Mehmed Mustafa Date: Mon, 9 Dec 2024 13:56:23 +0100 Subject: [PATCH 3/3] fix: bug of removing wrong file groups --- src/server/operandi_server/routers/workflow.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/server/operandi_server/routers/workflow.py b/src/server/operandi_server/routers/workflow.py index 02d818f3..0c0833a8 100644 --- a/src/server/operandi_server/routers/workflow.py +++ b/src/server/operandi_server/routers/workflow.py @@ -395,12 +395,16 @@ async def submit_to_rabbitmq_queue( db_workflow = await get_db_workflow_with_handling(self.logger, workflow_id=workflow_id) if preserve_file_grps: self.logger.info(f"Finding file groups to be removed based on the reproducible/preserve file groups") - self.logger.info(f"preserve_file_grps: {preserve_file_grps}") remove_file_grps = find_file_groups_to_remove_with_handling(self.logger, db_workspace, preserve_file_grps) remove_file_grps = ",".join(remove_file_grps) self.logger.info(f"remove_file_grps: {remove_file_grps}") - file_grps_reproducible = ",".join(db_workflow.producible_file_groups) - remove_file_grps += f",{file_grps_reproducible}" + list_preserver_file_grps = preserve_file_grps.split(',') + remove_file_grps_reproducible = [] + for file_group in db_workflow.producible_file_groups: + if file_group not in list_preserver_file_grps: + remove_file_grps_reproducible.append(file_group) + if len(remove_file_grps_reproducible) > 0: + remove_file_grps += f",{','.join(remove_file_grps_reproducible)}" self.logger.info(f"remove_file_grps including reproducible: {remove_file_grps}") try: