Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch from single fat ocrd_all image to many slim images in the HPC #20

Merged
merged 16 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ run-tests-utils:

run-tests-broker:
export $(shell sed 's/=.*//' ./tests/.env)
pytest tests/tests_broker/test_*.py -v
pytest tests/tests_broker/test_*.py -s -v

run-tests-harvester:
export $(shell sed 's/=.*//' ./tests/.env)
Expand Down
14 changes: 9 additions & 5 deletions src/broker/operandi_broker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from os.path import join
from pathlib import Path
from sys import exit
from typing import List

from operandi_utils import reconfigure_all_loggers, get_log_file_path_prefix
from operandi_utils.constants import LOG_LEVEL_WORKER, StateJob, StateWorkspace
Expand Down Expand Up @@ -111,6 +112,7 @@ def __callback(self, ch, method, properties, body):

workflow_script_path = Path(workflow_db.workflow_script_path)
nf_uses_mets_server = workflow_db.uses_mets_server
nf_executable_steps = workflow_db.executable_steps
workspace_dir = Path(workspace_db.workspace_dir)
mets_basename = workspace_db.mets_basename
ws_pages_amount = workspace_db.pages_amount
Expand All @@ -132,8 +134,8 @@ def __callback(self, ch, method, properties, body):
workspace_dir=workspace_dir, workspace_base_mets=mets_basename,
workflow_script_path=workflow_script_path, input_file_grp=input_file_grp,
nf_process_forks=nf_process_forks, ws_pages_amount=ws_pages_amount, use_mets_server=nf_uses_mets_server,
file_groups_to_remove=remove_file_grps, cpus=slurm_job_cpus, ram=slurm_job_ram,
partition=slurm_job_partition
nf_executable_steps=nf_executable_steps, file_groups_to_remove=remove_file_grps, cpus=slurm_job_cpus,
ram=slurm_job_ram, partition=slurm_job_partition
)
self.log.info(f"The HPC slurm job was successfully submitted")
except Exception as error:
Expand Down Expand Up @@ -200,7 +202,8 @@ def signal_handler(self, sig, frame):
def prepare_and_trigger_slurm_job(
self, workflow_job_id: str, workspace_id: str, workspace_dir: Path, workspace_base_mets: str,
workflow_script_path: Path, input_file_grp: str, nf_process_forks: int, ws_pages_amount: int,
use_mets_server: bool, file_groups_to_remove: str, cpus: int, ram: int, partition: str
use_mets_server: bool, nf_executable_steps: List[str], file_groups_to_remove: str, cpus: int, ram: int,
partition: str
) -> str:
if self.test_sbatch:
job_deadline_time = HPC_JOB_DEADLINE_TIME_TEST
Expand Down Expand Up @@ -232,8 +235,9 @@ def prepare_and_trigger_slurm_job(
workflow_job_id=workflow_job_id, nextflow_script_path=workflow_script_path,
workspace_id=workspace_id, mets_basename=workspace_base_mets,
input_file_grp=input_file_grp, nf_process_forks=nf_process_forks, ws_pages_amount=ws_pages_amount,
use_mets_server=use_mets_server, file_groups_to_remove=file_groups_to_remove, cpus=cpus, ram=ram,
job_deadline_time=job_deadline_time, partition=partition, qos=qos)
use_mets_server=use_mets_server, nf_executable_steps=nf_executable_steps,
file_groups_to_remove=file_groups_to_remove, cpus=cpus, ram=ram, job_deadline_time=job_deadline_time,
partition=partition, qos=qos)
except Exception as error:
db_stats = sync_db_increase_processing_stats(
find_user_id=self.current_message_user_id, pages_failed=ws_pages_amount)
Expand Down
51 changes: 41 additions & 10 deletions src/server/operandi_server/routers/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from starlette.status import HTTP_404_NOT_FOUND

from operandi_utils import get_nf_workflows_dir
from operandi_utils import get_nf_wfs_dir, get_ocrd_process_wfs_dir
from operandi_utils.constants import AccountType, ServerApiTag, StateJob, StateWorkspace
from operandi_utils.database import (
db_create_workflow, db_create_workflow_job, db_get_hpc_slurm_job, db_get_workflow, db_update_workspace,
db_increase_processing_stats_with_handling)
from operandi_utils.oton import OTONConverter
from operandi_utils.rabbitmq import (
get_connection_publisher, RABBITMQ_QUEUE_JOB_STATUSES, RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS)
from operandi_server.constants import (
Expand All @@ -30,7 +31,7 @@
get_db_workflow_job_with_handling,
get_db_workflow_with_handling,
nf_script_uses_mets_server_with_handling,
validate_oton_with_handling
validate_oton_with_handling, nf_script_executable_steps_with_handling
)
from .workspace_utils import check_if_file_group_exists_with_handling, get_db_workspace_with_handling
from .user import RouterUser
Expand Down Expand Up @@ -133,10 +134,35 @@ async def _push_status_request_to_rabbitmq(self, job_id: str):
self.logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=message)

async def insert_production_workflows(self, production_workflows_dir: Path = get_nf_workflows_dir()):
async def produce_production_workflows(
self,
ocrd_process_wf_dir: Path = get_ocrd_process_wfs_dir(),
production_nf_wfs_dir: Path = get_nf_wfs_dir()
):
oton_converter = OTONConverter()
for path in ocrd_process_wf_dir.iterdir():
if not path.is_file():
self.logger.info(f"Skipping non-file path: {path}")
continue
if path.suffix != '.txt':
self.logger.info(f"Skipping non .txt extension file path: {path}")
continue
# path.stem -> file_name
# path.name -> file_name.ext
self.logger.info(f"Converting to Nextflow workflow the ocrd process workflow: {path}")
output_path = Path(production_nf_wfs_dir, f"{path.stem}.nf")
oton_converter.convert_oton(
input_path=path, output_path=str(output_path), environment="apptainer", with_mets_server=False)
self.logger.info(f"Converted to a Nextflow file without a mets server: {output_path}")
output_path = Path(production_nf_wfs_dir, f"{path.stem}_with_MS.nf")
oton_converter.convert_oton(
input_path=path, output_path=str(output_path), environment="apptainer", with_mets_server=True)
self.logger.info(f"Converted to a Nextflow file with a mets server: {output_path}")

async def insert_production_workflows(self, production_nf_wfs_dir: Path = get_nf_wfs_dir()):
wf_detail = "Workflow provided by the Operandi Server"
self.logger.info(f"Inserting production workflows for Operandi from: {production_workflows_dir}")
for path in production_workflows_dir.iterdir():
self.logger.info(f"Inserting production workflows for Operandi from: {production_nf_wfs_dir}")
for path in production_nf_wfs_dir.iterdir():
if not path.is_file():
self.logger.info(f"Skipping non-file path: {path}")
continue
Expand All @@ -150,11 +176,14 @@ async def insert_production_workflows(self, production_workflows_dir: Path = get
nf_script_dest = join(workflow_dir, path.name)
copyfile(src=path, dst=nf_script_dest)
uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest)
self.logger.info(f"Inserting: {workflow_id}, uses_mets_server: {uses_mets_server}, script path: {nf_script_dest}")
executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest)
self.logger.info(
f"Inserting: {workflow_id}, uses_mets_server: {uses_mets_server}, script path: {nf_script_dest}")
await db_create_workflow(
user_id="Operandi Server",
workflow_id=workflow_id, workflow_dir=workflow_dir, workflow_script_path=nf_script_dest,
workflow_script_base=path.name, uses_mets_server=uses_mets_server, details=wf_detail)
workflow_script_base=path.name, uses_mets_server=uses_mets_server, executable_steps=executable_steps,
details=wf_detail)
self.production_workflows.append(workflow_id)

async def list_workflows(self, auth: HTTPBasicCredentials = Depends(HTTPBasic())) -> List[WorkflowRsrc]:
Expand Down Expand Up @@ -204,10 +233,11 @@ async def upload_workflow_script(
self.logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest)
executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest)
db_workflow = await db_create_workflow(
user_id=py_user_action.user_id, workflow_id=workflow_id, workflow_dir=workflow_dir,
workflow_script_path=nf_script_dest, workflow_script_base=nextflow_script.filename,
uses_mets_server=uses_mets_server, details=details)
uses_mets_server=uses_mets_server, executable_steps=executable_steps, details=details)
return WorkflowRsrc.from_db_workflow(db_workflow)

async def update_workflow_script(
Expand Down Expand Up @@ -239,10 +269,11 @@ async def update_workflow_script(
self.logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
uses_mets_server = await nf_script_uses_mets_server_with_handling(self.logger, nf_script_dest)
executable_steps = await nf_script_executable_steps_with_handling(self.logger, nf_script_dest)
db_workflow = await db_create_workflow(
user_id=py_user_action.user_id, workflow_id=workflow_id, workflow_dir=workflow_dir,
workflow_script_path=nf_script_dest, workflow_script_base=nextflow_script.filename,
uses_mets_server=uses_mets_server, details=details)
uses_mets_server=uses_mets_server, executable_steps=executable_steps, details=details)
return WorkflowRsrc.from_db_workflow(db_workflow)

async def get_workflow_job_status(
Expand Down Expand Up @@ -461,4 +492,4 @@ async def convert_txt_to_nextflow(

await validate_oton_with_handling(self.logger, ocrd_process_txt)
await convert_oton_with_handling(self.logger, ocrd_process_txt, nf_script_dest, environment, with_mets_server)
return FileResponse(nf_script_dest, filename=f'{oton_id}.nf')
return FileResponse(nf_script_dest, filename=f'{oton_id}.nf', media_type="application/txt-file")
34 changes: 32 additions & 2 deletions src/server/operandi_server/routers/workflow_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from fastapi import HTTPException, status
from pathlib import Path
from typing import List

from operandi_utils.database import db_get_workflow, db_get_workflow_job
from operandi_utils.database.models import DBWorkflow, DBWorkflowJob
from operandi_utils.oton import OTONConverter, OCRDValidator
from operandi_utils.oton.constants import PARAMS_KEY_METS_SOCKET_PATH


async def get_db_workflow_with_handling(
Expand Down Expand Up @@ -39,9 +41,8 @@ async def get_db_workflow_job_with_handling(logger, job_id: str, check_local_exi
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=message)
return db_workflow_job


async def nf_script_uses_mets_server_with_handling(
logger, nf_script_path: str, search_string: str = "params.mets_socket"
logger, nf_script_path: str, search_string: str = PARAMS_KEY_METS_SOCKET_PATH
) -> bool:
try:
with open(nf_script_path) as nf_file:
Expand All @@ -56,6 +57,35 @@ async def nf_script_uses_mets_server_with_handling(
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)

async def nf_script_executable_steps_with_handling(logger, nf_script_path: str) -> List[str]:
processor_executables: List[str] = []
try:
with open(nf_script_path) as nf_file:
line = nf_file.readline()
while line:
for word in line.split(' '):
if "ocrd-" in word:
processor_executables.append(word)
break
line = nf_file.readline()
except Exception as error:
message = "Failed to identify processor executables in the provided Nextflow workflow."
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)

"""
apptainer_images: List[str] = []
try:
for executable in processor_executables:
apptainer_images.append(OCRD_PROCESSOR_EXECUTABLE_TO_IMAGE[executable])
except Exception as error:
message = "Failed to produce apptainer image names from the processor executables list"
logger.error(f"{message}, error: {error}")
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=message)
return apptainer_images
"""
logger.info(f"Found processor executables: {processor_executables}")
return processor_executables

async def validate_oton_with_handling(logger, ocrd_process_txt_path: str):
try:
Expand Down
1 change: 1 addition & 0 deletions src/server/operandi_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ async def include_webapi_routers(self):
self.include_router(RouterDiscovery().router)
self.include_router(RouterUser().router)
workflow_router = RouterWorkflow()
await workflow_router.produce_production_workflows()
await workflow_router.insert_production_workflows()
self.include_router(workflow_router.router)
self.include_router(RouterWorkspace().router)
Expand Down
6 changes: 4 additions & 2 deletions src/utils/operandi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
"is_url_responsive",
"generate_id",
"get_log_file_path_prefix",
"get_nf_workflows_dir",
"get_nf_wfs_dir",
"get_ocrd_process_wfs_dir",
"make_zip_archive",
"receive_file",
"reconfigure_all_loggers",
Expand All @@ -25,7 +26,8 @@
download_mets_file,
is_url_responsive,
generate_id,
get_nf_workflows_dir,
get_nf_wfs_dir,
get_ocrd_process_wfs_dir,
receive_file,
make_zip_archive,
unpack_zip_archive,
Expand Down
76 changes: 76 additions & 0 deletions src/utils/operandi_utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"LOG_LEVEL_SERVER",
"LOG_LEVEL_WORKER",
"MODULE_TYPES",
"OCRD_PROCESSOR_EXECUTABLE_TO_IMAGE",
"OLA_HD_BAG_ENDPOINT",
"OLA_HD_USER",
"OLA_HD_PASSWORD",
Expand Down Expand Up @@ -165,3 +166,78 @@ class StateWorkspace(str, Enum):
TRANSFERRING_TO_HPC = "TRANSFERRING_TO_HPC"
TRANSFERRING_FROM_HPC = "TRANSFERRING_FROM_HPC"
UNSET = "UNSET"

# TODO: Find a more optimal way of achieving this dynamically
OCRD_PROCESSOR_EXECUTABLE_TO_IMAGE = {
"ocrd": "ocrd_core.sif",
"ocrd-tesserocr-crop": "ocrd_tesserocr.sif",
"ocrd-tesserocr-deskew": "ocrd_tesserocr.sif",
"ocrd-tesserocr-recognize": "ocrd_tesserocr.sif",
"ocrd-tesserocr-segment": "ocrd_tesserocr.sif",
"ocrd-tesserocr-segment-line": "ocrd_tesserocr.sif",
"ocrd-tesserocr-segment-region": "ocrd_tesserocr.sif",
"ocrd-tesserocr-segment-table": "ocrd_tesserocr.sif",
"ocrd-tesserocr-segment-word": "ocrd_tesserocr.sif",
"ocrd-tesserocr-fontshape": "ocrd_tesserocr.sif",
"ocrd-tesserocr-binarize": "ocrd_tesserocr.sif",
"ocrd-cis-ocropy-binarize": "ocrd_cis.sif",
"ocrd-cis-ocropy-denoise": "ocrd_cis.sif",
"ocrd-cis-ocropy-deskew": "ocrd_cis.sif",
"ocrd-cis-ocropy-dewarp": "ocrd_cis.sif",
"ocrd-cis-ocropy-segment": "ocrd_cis.sif",
"ocrd-cis-ocropy-resegment": "ocrd_cis.sif",
"ocrd-cis-ocropy-clip": "ocrd_cis.sif",
"ocrd-cis-ocropy-recognize": "ocrd_cis.sif",
"ocrd-cis-ocropy-train": "ocrd_cis.sif",
"ocrd-cis-align": "ocrd_cis.sif",
"ocrd-cis-postcorrect": "ocrd_cis.sif",
"ocrd-kraken-recognize": "ocrd_kraken.sif",
"ocrd-kraken-segment": "ocrd_kraken.sif",
"ocrd-kraken-binarize": "ocrd_kraken.sif",
"ocrd-preprocess-image": "ocrd_wrap.sif",
"ocrd-skimage-normalize": "ocrd_wrap.sif",
"ocrd-skimage-binarize": "ocrd_wrap.sif",
"ocrd-skimage-denoise": "ocrd_wrap.sif",
"ocrd-skimage-denoise-raw": "ocrd_wrap.sif",
"ocrd-calamari-recognize": "ocrd_calamari.sif",
"ocrd-olena-binarize": "ocrd_olena.sif",
"ocrd-dinglehopper": "ocrd_dinglehopper.sif",
"ocrd-eynollah-segment": "ocrd_eynollah.sif",
"ocrd-fileformat-transform": "ocrd_fileformat.sif",
"ocrd-nmalign-merge": "ocrd_nmalign.sif",
"ocrd-segment-extract-glyphs": "ocrd_segment.sif",
"ocrd-segment-extract-lines": "ocrd_segment.sif",
"ocrd-segment-extract-pages": "ocrd_segment.sif",
"ocrd-segment-extract-regions": "ocrd_segment.sif",
"ocrd-segment-extract-words": "ocrd_segment.sif",
"ocrd-segment-from-coco": "ocrd_segment.sif",
"ocrd-segment-from-masks": "ocrd_segment.sif",
"ocrd-segment-project": "ocrd_segment.sif",
"ocrd-segment-repair": "ocrd_segment.sif",
"ocrd-segment-replace-original": "ocrd_segment.sif",
"ocrd-segment-replace-page": "ocrd_segment.sif",
"ocrd-segment-replace-text": "ocrd_segment.sif",
"ocrd-segment-evaluate": "ocrd_segment.sif",
"ocrd-anybaseocr-dewarp": "ocrd_anybaseocr.sif",
"ocrd-anybaseocr-crop": "ocrd_anybaseocr.sif",
"ocrd-anybaseocr-binarize": "ocrd_anybaseocr.sif",
"ocrd-anybaseocr-layout-analysis": "ocrd_anybaseocr.sif",
"ocrd-anybaseocr-textline": "ocrd_anybaseocr.sif",
"ocrd-anybaseocr-tiseg": "ocrd_anybaseocr.sif",
"ocrd-anybaseocr-block-segmentation": "ocrd_anybaseocr.sif",
"ocrd-anybaseocr-deskew": "ocrd_anybaseocr.sif",
"ocrd-sbb-binarize": "ocrd_sbb_binarization.sif",
"ocrd-detectron2-segment": "ocrd_detectron2.sif",
"ocrd-froc": "ocrd_froc.sif",
"ocrd-pagetopdf": "ocrd_pagetopdf.sif",
"ocrd-keraslm-rate": "ocrd_keraslm.sif",
"ocrd-docstruct": "ocrd_docstruct.sif",
"ocrd-doxa-binarize": "ocrd_doxa.sif",
"ocrd-im6convert": "ocrd_im6convert.sif",
"ocrd-olahd-client": "ocrd_olahd-client.sif",
"ocrd-cor-asv-ann-mark": "ocrd_cor-asv-ann.sif",
"ocrd-cor-asv-ann-align": "ocrd_cor-asv-ann.sif",
"ocrd-cor-asv-ann-evaluate": "ocrd_cor-asv-ann.sif",
"ocrd-cor-asv-ann-join": "ocrd_cor-asv-ann.sif",
"ocrd-cor-asv-ann-process": "ocrd_cor-asv-ann.sif"
}
Loading