Skip to content

Commit

Permalink
fix: conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Dec 2, 2024
2 parents 068713b + 56da368 commit b9bd746
Show file tree
Hide file tree
Showing 62 changed files with 3,817 additions and 1,469 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ run-tests-utils:

run-tests-broker:
export $(shell sed 's/=.*//' ./tests/.env)
pytest tests/tests_broker/test_*.py -v
pytest tests/tests_broker/test_*.py -s -v

run-tests-harvester:
export $(shell sed 's/=.*//' ./tests/.env)
Expand Down
14 changes: 9 additions & 5 deletions src/broker/operandi_broker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from os.path import join
from pathlib import Path
from sys import exit
from typing import List

from operandi_utils import reconfigure_all_loggers, get_log_file_path_prefix
from operandi_utils.constants import LOG_LEVEL_WORKER, StateJob, StateWorkspace
Expand Down Expand Up @@ -111,6 +112,7 @@ def __callback(self, ch, method, properties, body):

workflow_script_path = Path(workflow_db.workflow_script_path)
nf_uses_mets_server = workflow_db.uses_mets_server
nf_executable_steps = workflow_db.executable_steps
workspace_dir = Path(workspace_db.workspace_dir)
mets_basename = workspace_db.mets_basename
ws_pages_amount = workspace_db.pages_amount
Expand All @@ -132,8 +134,8 @@ def __callback(self, ch, method, properties, body):
workspace_dir=workspace_dir, workspace_base_mets=mets_basename,
workflow_script_path=workflow_script_path, input_file_grp=input_file_grp,
nf_process_forks=nf_process_forks, ws_pages_amount=ws_pages_amount, use_mets_server=nf_uses_mets_server,
file_groups_to_remove=remove_file_grps, cpus=slurm_job_cpus, ram=slurm_job_ram,
partition=slurm_job_partition
nf_executable_steps=nf_executable_steps, file_groups_to_remove=remove_file_grps, cpus=slurm_job_cpus,
ram=slurm_job_ram, partition=slurm_job_partition
)
self.log.info(f"The HPC slurm job was successfully submitted")
except Exception as error:
Expand Down Expand Up @@ -200,7 +202,8 @@ def signal_handler(self, sig, frame):
def prepare_and_trigger_slurm_job(
self, workflow_job_id: str, workspace_id: str, workspace_dir: Path, workspace_base_mets: str,
workflow_script_path: Path, input_file_grp: str, nf_process_forks: int, ws_pages_amount: int,
use_mets_server: bool, file_groups_to_remove: str, cpus: int, ram: int, partition: str
use_mets_server: bool, nf_executable_steps: List[str], file_groups_to_remove: str, cpus: int, ram: int,
partition: str
) -> str:
if self.test_sbatch:
job_deadline_time = HPC_JOB_DEADLINE_TIME_TEST
Expand Down Expand Up @@ -232,8 +235,9 @@ def prepare_and_trigger_slurm_job(
workflow_job_id=workflow_job_id, nextflow_script_path=workflow_script_path,
workspace_id=workspace_id, mets_basename=workspace_base_mets,
input_file_grp=input_file_grp, nf_process_forks=nf_process_forks, ws_pages_amount=ws_pages_amount,
use_mets_server=use_mets_server, file_groups_to_remove=file_groups_to_remove, cpus=cpus, ram=ram,
job_deadline_time=job_deadline_time, partition=partition, qos=qos)
use_mets_server=use_mets_server, nf_executable_steps=nf_executable_steps,
file_groups_to_remove=file_groups_to_remove, cpus=cpus, ram=ram, job_deadline_time=job_deadline_time,
partition=partition, qos=qos)
except Exception as error:
db_stats = sync_db_increase_processing_stats(
find_user_id=self.current_message_user_id, pages_failed=ws_pages_amount)
Expand Down
3 changes: 2 additions & 1 deletion src/server/operandi_server/routers/admin_panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self):
endpoint=self.user_workflows, methods=["GET"], status_code=status.HTTP_200_OK,
summary="Get all workflows submitted by the user identified by user_id"
)

async def push_to_ola_hd(self, workspace_id: str, auth: HTTPBasicCredentials = Depends(HTTPBasic())):
py_user_action = await self.user_authenticator.user_login(auth)
if py_user_action.account_type != AccountType.ADMIN:
Expand Down Expand Up @@ -86,7 +87,7 @@ async def get_users(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())):
message = "Admin privileges required for the endpoint"
self.logger.error(message)
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=message)

users = await db_get_all_user_accounts()
return [PYUserInfo.from_db_user_account(user) for user in users]

Expand Down
56 changes: 44 additions & 12 deletions src/server/operandi_server/routers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from starlette.status import HTTP_404_NOT_FOUND

from operandi_utils import get_nf_workflows_dir
from operandi_utils import get_nf_wfs_dir, get_ocrd_process_wfs_dir
from operandi_utils.constants import AccountType, ServerApiTag, StateJob, StateWorkspace
from operandi_utils.database import (
db_create_workflow, db_create_workflow_job, db_get_hpc_slurm_job, db_get_workflow, db_update_workspace,
db_increase_processing_stats_with_handling)
from operandi_utils.oton import OTONConverter
from operandi_utils.rabbitmq import (
get_connection_publisher, RABBITMQ_QUEUE_JOB_STATUSES, RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS)
from operandi_server.constants import (
Expand All @@ -30,7 +31,7 @@
get_db_workflow_job_with_handling,
get_db_workflow_with_handling,
nf_script_uses_mets_server_with_handling,
validate_oton_with_handling
validate_oton_with_handling, nf_script_executable_steps_with_handling
)
from .workspace_utils import check_if_file_group_exists_with_handling, get_db_workspace_with_handling
from .user import RouterUser
Expand Down Expand Up @@ -133,10 +134,35 @@ async def _push_status_request_to_rabbitmq(self, job_id: str):
self.logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=message)

async def insert_production_workflows(self, production_workflows_dir: Path = get_nf_workflows_dir()):
async def produce_production_workflows(
self,
ocrd_process_wf_dir: Path = get_ocrd_process_wfs_dir(),
production_nf_wfs_dir: Path = get_nf_wfs_dir()
):
oton_converter = OTONConverter()
for path in ocrd_process_wf_dir.iterdir():
if not path.is_file():
self.logger.info(f"Skipping non-file path: {path}")
continue
if path.suffix != '.txt':
self.logger.info(f"Skipping non .txt extension file path: {path}")
continue
# path.stem -> file_name
# path.name -> file_name.ext
self.logger.info(f"Converting to Nextflow workflow the ocrd process workflow: {path}")
output_path = Path(production_nf_wfs_dir, f"{path.stem}.nf")
oton_converter.convert_oton(
input_path=path, output_path=str(output_path), environment="apptainer", with_mets_server=False)
self.logger.info(f"Converted to a Nextflow file without a mets server: {output_path}")
output_path = Path(production_nf_wfs_dir, f"{path.stem}_with_MS.nf")
oton_converter.convert_oton(
input_path=path, output_path=str(output_path), environment="apptainer", with_mets_server=True)
self.logger.info(f"Converted to a Nextflow file with a mets server: {output_path}")

async def insert_production_workflows(self, production_nf_wfs_dir: Path = get_nf_wfs_dir()):
wf_detail = "Workflow provided by the Operandi Server"
self.logger.info(f"Inserting production workflows for Operandi from: {production_workflows_dir}")
for path in production_workflows_dir.iterdir():
self.logger.info(f"Inserting production workflows for Operandi from: {production_nf_wfs_dir}")
for path in production_nf_wfs_dir.iterdir():
if not path.is_file():
self.logger.info(f"Skipping non-file path: {path}")
continue
Expand All @@ -150,11 +176,14 @@ async def insert_production_workflows(self, production_workflows_dir: Path = get
nf_script_dest = join(workflow_dir, path.name)
copyfile(src=path, dst=nf_script_dest)
uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest)
self.logger.info(f"Inserting: {workflow_id}, uses_mets_server: {uses_mets_server}, script path: {nf_script_dest}")
executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest)
self.logger.info(
f"Inserting: {workflow_id}, uses_mets_server: {uses_mets_server}, script path: {nf_script_dest}")
await db_create_workflow(
user_id="Operandi Server",
workflow_id=workflow_id, workflow_dir=workflow_dir, workflow_script_path=nf_script_dest,
workflow_script_base=path.name, uses_mets_server=uses_mets_server, details=wf_detail)
workflow_script_base=path.name, uses_mets_server=uses_mets_server, executable_steps=executable_steps,
details=wf_detail)
self.production_workflows.append(workflow_id)

async def list_workflows(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> List[WorkflowRsrc]:
Expand Down Expand Up @@ -204,10 +233,11 @@ async def upload_workflow_script(
self.logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest)
executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest)
db_workflow = await db_create_workflow(
user_id=py_user_action.user_id, workflow_id=workflow_id, workflow_dir=workflow_dir,
workflow_script_path=nf_script_dest, workflow_script_base=nextflow_script.filename,
uses_mets_server=uses_mets_server, details=details)
uses_mets_server=uses_mets_server, executable_steps=executable_steps, details=details)
return WorkflowRsrc.from_db_workflow(db_workflow)

async def update_workflow_script(
Expand Down Expand Up @@ -239,10 +269,11 @@ async def update_workflow_script(
self.logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest)
executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest)
db_workflow = await db_create_workflow(
user_id=py_user_action.user_id, workflow_id=workflow_id, workflow_dir=workflow_dir,
workflow_script_path=nf_script_dest, workflow_script_base=nextflow_script.filename,
uses_mets_server=uses_mets_server, details=details)
uses_mets_server=uses_mets_server, executable_steps=executable_steps, details=details)
return WorkflowRsrc.from_db_workflow(db_workflow)

async def get_workflow_job_status(
Expand Down Expand Up @@ -442,7 +473,8 @@ def _push_job_to_rabbitmq(

# Added by Faizan
async def convert_txt_to_nextflow(
self, txt_file: UploadFile, environment: str, auth: HTTPBasicCredentials = Depends(HTTPBasic())
self, txt_file: UploadFile, environment: str, with_mets_server: bool = True,
auth: HTTPBasicCredentials = Depends(HTTPBasic())
):
# Authenticate the user
await self.user_authenticator.user_login(auth)
Expand All @@ -459,5 +491,5 @@ async def convert_txt_to_nextflow(
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)

await validate_oton_with_handling(self.logger, ocrd_process_txt)
await convert_oton_with_handling(self.logger, environment, ocrd_process_txt, nf_script_dest)
return FileResponse(nf_script_dest, filename=f'{oton_id}.nf')
await convert_oton_with_handling(self.logger, ocrd_process_txt, nf_script_dest, environment, with_mets_server)
return FileResponse(nf_script_dest, filename=f'{oton_id}.nf', media_type="application/txt-file")
45 changes: 36 additions & 9 deletions src/server/operandi_server/routers/workflow_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from fastapi import HTTPException, status
from pathlib import Path
from typing import List

from operandi_utils.database import db_get_workflow, db_get_workflow_job
from operandi_utils.database.models import DBWorkflow, DBWorkflowJob
from operandi_utils.oton import OTONConverter, OCRDValidator
from operandi_utils.oton.constants import PARAMS_KEY_METS_SOCKET_PATH


async def get_db_workflow_with_handling(
Expand Down Expand Up @@ -39,9 +41,8 @@ async def get_db_workflow_job_with_handling(logger, job_id: str, check_local_exi
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
return db_workflow_job


async def nf_script_uses_mets_server_with_handling(
logger, nf_script_path: str, search_string: str = "params.mets_socket"
logger, nf_script_path: str, search_string: str = PARAMS_KEY_METS_SOCKET_PATH
) -> bool:
try:
with open(nf_script_path) as nf_file:
Expand All @@ -56,6 +57,35 @@ async def nf_script_uses_mets_server_with_handling(
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)

async def nf_script_executable_steps_with_handling(logger, nf_script_path: str) -> List[str]:
processor_executables: List[str] = []
try:
with open(nf_script_path) as nf_file:
line = nf_file.readline()
while line:
for word in line.split(' '):
if "ocrd-" in word:
processor_executables.append(word)
break
line = nf_file.readline()
except Exception as error:
message = "Failed to identify processor executables in the provided Nextflow workflow."
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)

"""
apptainer_images: List[str] = []
try:
for executable in processor_executables:
apptainer_images.append(OCRD_PROCESSOR_EXECUTABLE_TO_IMAGE[executable])
except Exception as error:
message = "Failed to produce apptainer image names from the processor executables list"
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)
return apptainer_images
"""
logger.info(f"Found processor executables: {processor_executables}")
return processor_executables

async def validate_oton_with_handling(logger, ocrd_process_txt_path: str):
try:
Expand All @@ -67,20 +97,17 @@ async def validate_oton_with_handling(logger, ocrd_process_txt_path: str):
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)

async def convert_oton_with_handling(logger, environment: str, ocrd_process_txt_path: str, nf_script_dest_path: str):
async def convert_oton_with_handling(
logger, ocrd_process_txt_path: str, nf_script_dest_path: str, environment: str, with_mets_server: bool
):
environments = ["local", "docker", "apptainer"]
if environment not in environments:
message = f"Unknown environment value: {environment}. Must be one of: {environments}"
logger.error(message)
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
try:
converter = OTONConverter()
if environment == "local":
converter.convert_oton_env_local(str(ocrd_process_txt_path), str(nf_script_dest_path))
elif environment == "docker":
converter.convert_oton_env_docker(str(ocrd_process_txt_path), str(nf_script_dest_path))
elif environment == "apptainer":
converter.convert_oton_env_apptainer(str(ocrd_process_txt_path), str(nf_script_dest_path))
converter.convert_oton(str(ocrd_process_txt_path), str(nf_script_dest_path), environment, with_mets_server)
except ValueError as error:
message = "Failed to convert ocrd process workflow to nextflow workflow"
logger.error(f"{message}, error: {error}")
Expand Down
1 change: 1 addition & 0 deletions src/server/operandi_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ async def include_webapi_routers(self):
self.include_router(RouterDiscovery().router)
self.include_router(RouterUser().router)
workflow_router = RouterWorkflow()
await workflow_router.produce_production_workflows()
await workflow_router.insert_production_workflows()
self.include_router(workflow_router.router)
self.include_router(RouterWorkspace().router)
Expand Down
6 changes: 4 additions & 2 deletions src/utils/operandi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"is_url_responsive",
"generate_id",
"get_log_file_path_prefix",
"get_nf_workflows_dir",
"get_nf_wfs_dir",
"get_ocrd_process_wfs_dir",
"make_zip_archive",
"receive_file",
"reconfigure_all_loggers",
Expand All @@ -25,7 +26,8 @@
download_mets_file,
is_url_responsive,
generate_id,
get_nf_workflows_dir,
get_nf_wfs_dir,
get_ocrd_process_wfs_dir,
receive_file,
make_zip_archive,
unpack_zip_archive,
Expand Down
Loading

0 comments on commit b9bd746

Please sign in to comment.