From 4a3f4f6bf7a2ccee8d4b59838d3b4a7880bc4460 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Thu, 4 Jan 2024 11:15:57 -0700 Subject: [PATCH 01/22] Adjustments to API to accommodate needs in alchemiscale-fah --- alchemiscale/compute/api.py | 19 +++++++++++++++++++ alchemiscale/compute/client.py | 10 ++++++++-- alchemiscale/compute/service.py | 2 +- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 8679c3a1..4037008b 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -195,6 +195,25 @@ def claim_taskhub_tasks( @router.get("/tasks/{task_scoped_key}/transformation", response_class=GufeJSONResponse) def get_task_transformation( + task_scoped_key, + *, + n4js: Neo4jStore = Depends(get_n4js_depends), + token: TokenData = Depends(get_token_data_depends), +): + sk = ScopedKey.from_str(task_scoped_key) + validate_scopes(sk.scope, token) + + transformation: ScopedKey + + transformation, protocoldagresultref = n4js.get_task_transformation( + task=task_scoped_key, + return_gufe=False, + ) + + return str(transformation) + +@router.get("/tasks/{task_scoped_key}/transformation/gufe", response_class=GufeJSONResponse) +def retrieve_task_transformation( task_scoped_key, *, n4js: Neo4jStore = Depends(get_n4js_depends), diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index 59809494..b8689dd9 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -80,11 +80,17 @@ def claim_taskhub_tasks( return [ScopedKey.from_str(t) if t is not None else None for t in tasks] - def get_task_transformation( + def get_task_transformation(self, task: ScopedKey) -> ScopedKey: + """Get the Transformation associated with the given Task.""" + transformation = self._get_resource(f"/tasks/{task}/transformation") + return ScopedKey.from_str(transformation) + + + def retrieve_task_transformation( self, task: ScopedKey ) -> Tuple[Transformation, Optional[ProtocolDAGResult]]: transformation, protocoldagresult = self._get_resource( - f"tasks/{task}/transformation" + f"tasks/{task}/transformation/gufe" ) return ( diff --git a/alchemiscale/compute/service.py b/alchemiscale/compute/service.py index 9bc95b6d..f30fcfb7 100644 --- a/alchemiscale/compute/service.py +++ b/alchemiscale/compute/service.py @@ -284,7 +284,7 @@ def task_to_protocoldag( """ - transformation, extends_protocoldagresult = self.client.get_task_transformation( + transformation, extends_protocoldagresult = self.client.retrieve_task_transformation( task ) From 4627c4106891ee0cd297498f8762a589f0732ae2 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Fri, 5 Jan 2024 12:23:50 -0700 Subject: [PATCH 02/22] Test fixes, compute service settings refactor, and addition of protocol selection --- alchemiscale/compute/api.py | 2 +- alchemiscale/compute/client.py | 7 +- alchemiscale/compute/service.py | 135 +++++------------- alchemiscale/compute/settings.py | 91 ++++++++++++ .../compute/client/test_compute_client.py | 6 +- .../tests/integration/compute/conftest.py | 2 +- .../integration/compute/test_compute_api.py | 4 +- .../configs/synchronous-compute-settings.yaml | 4 + 8 files changed, 143 insertions(+), 108 deletions(-) create mode 100644 alchemiscale/compute/settings.py diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index d6a7b301..613ff417 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -211,7 +211,7 @@ def get_task_transformation( return str(transformation) -@router.get("/tasks/{task_scoped_key}/transformation/gufe", response_class=GufeJSONResponse) +@router.get("/tasks/{task_scoped_key}/transformation/gufe") def retrieve_task_transformation( task_scoped_key, *, diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index b8689dd9..c12041db 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -72,10 +72,13 @@ def query_taskhubs( return taskhubs def claim_taskhub_tasks( - self, taskhub: ScopedKey, compute_service_id: ComputeServiceID, count: int = 1 + self, taskhub: ScopedKey, + compute_service_id: ComputeServiceID, + count: int = 1, + protocols: Optional[List[str]] = None ) -> Task: """Claim a `Task` from the specified `TaskHub`""" - data = dict(compute_service_id=str(compute_service_id), count=count) + data = dict(compute_service_id=str(compute_service_id), count=count, protocols=protocols) tasks = self._post_resource(f"taskhubs/{taskhub}/claim", data) return [ScopedKey.from_str(t) if t is not None else None for t in tasks] diff --git a/alchemiscale/compute/service.py b/alchemiscale/compute/service.py index f30fcfb7..0a32b270 100644 --- a/alchemiscale/compute/service.py +++ b/alchemiscale/compute/service.py @@ -24,6 +24,7 @@ from gufe.protocols.protocoldag import execute_DAG, ProtocolDAG, ProtocolDAGResult from .client import AlchemiscaleComputeClient +from .settings import ComputeServiceSettings from ..storage.models import Task, TaskHub, ComputeServiceID from ..models import Scope, ScopedKey @@ -75,112 +76,39 @@ class SynchronousComputeService: def __init__( self, - api_url: str, - identifier: str, - key: str, - name: str, - shared_basedir: os.PathLike, - scratch_basedir: os.PathLike, - keep_shared: bool = False, - keep_scratch: bool = False, - n_retries: int = 3, - sleep_interval: int = 30, - heartbeat_interval: int = 300, - scopes: Optional[List[Scope]] = None, - claim_limit: int = 1, - loglevel="WARN", - logfile: Optional[Path] = None, - client_max_retries=5, - client_retry_base_seconds=2.0, - client_retry_max_seconds=60.0, - client_verify=True, + settings: ComputeServiceSettings ): - """Create a `SynchronousComputeService` instance. + """Create a `SynchronousComputeService` instance.""" + self.settings = settings - Parameters - ---------- - api_url - URL of the compute API to execute Tasks for. - identifier - Identifier for the compute identity used for authentication. - key - Credential for the compute identity used for authentication. - name - The name to give this compute service; used for Task provenance, so - typically set to a distinct value to distinguish different compute - resources, e.g. different hosts or HPC clusters. - shared_basedir - Filesystem path to use for `ProtocolDAG` `shared` space. - scratch_basedir - Filesystem path to use for `ProtocolUnit` `scratch` space. - keep_shared - If True, don't remove shared directories for `ProtocolDAG`s after - completion. - keep_scratch - If True, don't remove scratch directories for `ProtocolUnit`s after - completion. - n_retries - Number of times to attempt a given Task on failure. - sleep_interval - Time in seconds to sleep if no Tasks claimed from compute API. - heartbeat_interval - Frequency at which to send heartbeats to compute API. - scopes - Scopes to limit Task claiming to; defaults to all Scopes accessible - by compute identity. - claim_limit - Maximum number of Tasks to claim at a time from a TaskHub. - loglevel - The loglevel at which to report; see the :mod:`logging` docs for - available levels. - logfile - Path to file for logging output; if not set, logging will only go - to STDOUT. - client_max_retries - Maximum number of times to retry a request. In the case the API - service is unresponsive an expoenential backoff is applied with - retries until this number is reached. If set to -1, retries will - continue indefinitely until success. - client_retry_base_seconds - The base number of seconds to use for exponential backoff. - Must be greater than 1.0. - client_retry_max_seconds - Maximum number of seconds to sleep between retries; avoids runaway - exponential backoff while allowing for many retries. - client_verify - Whether to verify SSL certificate presented by the API server. - - """ - self.api_url = api_url - self.name = name - self.sleep_interval = sleep_interval - self.heartbeat_interval = heartbeat_interval - self.claim_limit = claim_limit + self.api_url = self.settings.api_url + self.name = self.settings.name + self.sleep_interval = self.settings.sleep_interval + self.heartbeat_interval = self.settings.heartbeat_interval + self.claim_limit = self.settings.claim_limit self.client = AlchemiscaleComputeClient( - api_url, - identifier, - key, - max_retries=client_max_retries, - retry_base_seconds=client_retry_base_seconds, - retry_max_seconds=client_retry_max_seconds, - verify=client_verify, + self.settings.api_url, + self.settings.identifier, + self.settings.key, + max_retries=self.settings.client_max_retries, + retry_base_seconds=self.settings.client_retry_base_seconds, + retry_max_seconds=self.settings.client_retry_max_seconds, + verify=self.settings.client_verify, ) - if scopes is None: + if self.settings.scopes is None: self.scopes = [Scope()] else: - self.scopes = scopes + self.scopes = self.settings.scopes - self.shared_basedir = Path(shared_basedir).absolute() + self.shared_basedir = Path(self.settings.shared_basedir).absolute() self.shared_basedir.mkdir(exist_ok=True) - self.keep_shared = keep_shared + self.keep_shared = self.settings.keep_shared - self.scratch_basedir = Path(scratch_basedir).absolute() + self.scratch_basedir = Path(self.settings.scratch_basedir).absolute() self.scratch_basedir.mkdir(exist_ok=True) - self.keep_scratch = keep_scratch - - self.n_retries = n_retries + self.keep_scratch = self.settings.keep_scratch self.scheduler = sched.scheduler(time.monotonic, time.sleep) @@ -193,7 +121,7 @@ def __init__( # logging extra = {"compute_service_id": str(self.compute_service_id)} logger = logging.getLogger("AlchemiscaleSynchronousComputeService") - logger.setLevel(loglevel) + logger.setLevel(self.settings.loglevel) formatter = logging.Formatter( "[%(asctime)s] [%(compute_service_id)s] [%(levelname)s] %(message)s" @@ -204,8 +132,8 @@ def __init__( sh.setFormatter(formatter) logger.addHandler(sh) - if logfile is not None: - fh = logging.FileHandler(logfile) + if self.settings.logfile is not None: + fh = logging.FileHandler(self.settings.logfile) fh.setFormatter(formatter) logger.addHandler(fh) @@ -232,11 +160,19 @@ def heartbeat(self): self.beat() time.sleep(self.heartbeat_interval) - def claim_tasks(self, count=1) -> List[Optional[ScopedKey]]: + def claim_tasks(self, count=1, protocols: Optional[List[str]] = None) -> List[Optional[ScopedKey]]: """Get a Task to execute from compute API. Returns `None` if no Task was available matching service configuration. + Parameters + ---------- + count + The maximum number of Tasks to claim. + protocols + Protocol names to restrict Task claiming to. `None` means no restriction. + Regex patterns are allowed. + """ # list of tasks to return tasks = [] @@ -261,6 +197,7 @@ def claim_tasks(self, count=1) -> List[Optional[ScopedKey]]: taskhub, compute_service_id=self.compute_service_id, count=(count - len(tasks)), + protocols=protocols ) # gather up claimed tasks, if present @@ -341,7 +278,7 @@ def execute(self, task: ScopedKey) -> ScopedKey: scratch_basedir=scratch, keep_scratch=self.keep_scratch, raise_error=False, - n_retries=self.n_retries, + n_retries=self.settings.n_retries, ) finally: if not self.keep_shared: diff --git a/alchemiscale/compute/settings.py b/alchemiscale/compute/settings.py new file mode 100644 index 00000000..0afa9c9f --- /dev/null +++ b/alchemiscale/compute/settings.py @@ -0,0 +1,91 @@ +import os +from typing import Union, Optional, List, Dict, Tuple +from pydantic import BaseModel, Field + +from ..models import Scope, ScopedKey + + +class ComputeServiceSettings(BaseModel): + """Core settings schema for a compute service.""" + + api_url: str = Field( + ..., description="URL of the compute API to execute Tasks for." + ) + identifier: str = Field( + ..., description="Identifier for the compute identity used for authentication." + ) + key: str = Field( + ..., description="Credential for the compute identity used for authentication." + ) + name: str = Field( + ..., + description=( + "The name to give this compute service; used for Task provenance, so " + "typically set to a distinct value to distinguish different compute " + "resources, e.g. different hosts or HPC clusters." + ), + ) + shared_basedir: os.PathLike = Field( + ..., description="Filesystem path to use for `ProtocolDAG` `shared` space." + ) + scratch_basedir: os.PathLike = Field( + ..., description="Filesystem path to use for `ProtocolUnit` `scratch` space." + ) + keep_shared: bool = Field( + False, + description="If True, don't remove shared directories for `ProtocolDAG`s after completion.", + ) + keep_scratch: bool = Field( + False, + description="If True, don't remove scratch directories for `ProtocolUnit`s after completion.", + ) + n_retries: int = Field( + 3, + description="Number of times to attempt a given Task on failure.", + ) + sleep_interval: int = Field( + 30, description="Time in seconds to sleep if no Tasks claimed from compute API." + ) + heartbeat_interval: int = Field( + 300, description="Frequency at which to send heartbeats to compute API." + ) + scopes: Optional[List[Scope]] = Field( + None, + description="Scopes to limit Task claiming to; defaults to all Scopes accessible by compute identity.", + ) + protocols: Optional[List[str]] = Field( + None, + description="Names of Protocols to run with this service; regex patterns are allowed. `None` means no restriction", + ) + claim_limit: int = Field( + 1000, description="Maximum number of Tasks to claim at a time from a TaskHub." + ) + loglevel: str = Field( + "WARN", + description="The loglevel at which to report; see the :mod:`logging` docs for available levels.", + ) + logfile: Optional[os.PathLike] = Field( + None, + description="Path to file for logging output; if not set, logging will only go to STDOUT.", + ) + client_max_retries: int = Field( + 5, + description=( + "Maximum number of times to retry a request. " + "In the case the API service is unresponsive an expoenential backoff " + "is applied with retries until this number is reached. " + "If set to -1, retries will continue indefinitely until success." + ), + ) + client_retry_base_seconds: float = Field( + 2.0, + description="The base number of seconds to use for exponential backoff. Must be greater than 1.0.", + ) + client_retry_max_seconds: float = Field( + 60.0, + description="Maximum number of seconds to sleep between retries; avoids runaway exponential backoff while allowing for many retries.", + ) + client_verify: bool = Field( + True, + description="Whether to verify SSL certificate presented by the API server.", + ) diff --git a/alchemiscale/tests/integration/compute/client/test_compute_client.py b/alchemiscale/tests/integration/compute/client/test_compute_client.py index 37321c05..5c445af1 100644 --- a/alchemiscale/tests/integration/compute/client/test_compute_client.py +++ b/alchemiscale/tests/integration/compute/client/test_compute_client.py @@ -201,7 +201,7 @@ def test_get_task_transformation( ( transformation_, extends_protocoldagresult, - ) = compute_client.get_task_transformation(task_sks[0]) + ) = compute_client.retrieve_task_transformation(task_sks[0]) assert transformation_ == transformation assert extends_protocoldagresult is None @@ -235,7 +235,7 @@ def test_set_task_result( ( transformation_, extends_protocoldagresult, - ) = compute_client.get_task_transformation(task_sks[0]) + ) = compute_client.retrieve_task_transformation(task_sks[0]) assert transformation_ == transformation assert extends_protocoldagresult is None @@ -251,7 +251,7 @@ def test_set_task_result( ( transformation2, extends_protocoldagresult2, - ) = compute_client.get_task_transformation(task_sk2) + ) = compute_client.retrieve_task_transformation(task_sk2) assert transformation2 == transformation_ assert extends_protocoldagresult2 == protocoldagresults[0] diff --git a/alchemiscale/tests/integration/compute/conftest.py b/alchemiscale/tests/integration/compute/conftest.py index 58779b5e..728cd5e6 100644 --- a/alchemiscale/tests/integration/compute/conftest.py +++ b/alchemiscale/tests/integration/compute/conftest.py @@ -147,7 +147,7 @@ def get_token_data_depends_override(): @pytest.fixture -def compute_api_no_auth(s3os, scope_consistent_token_data_depends_override): +def compute_api_no_auth(n4js_preloaded, s3os, scope_consistent_token_data_depends_override): def get_s3os_override(): return s3os diff --git a/alchemiscale/tests/integration/compute/test_compute_api.py b/alchemiscale/tests/integration/compute/test_compute_api.py index 8ab1c7a5..b1ed6c14 100644 --- a/alchemiscale/tests/integration/compute/test_compute_api.py +++ b/alchemiscale/tests/integration/compute/test_compute_api.py @@ -63,13 +63,13 @@ def out_of_scoped_keys(self, n4js_preloaded, network_tyk2, multiple_scopes): assert len(task_sks) > 0 return {"network": network_sk, "taskhub": tq_sk, "tasks": task_sks} - def test_get_task_transformation( + def test_retrieve_task_transformation( self, n4js_preloaded, test_client, scoped_keys, ): - response = test_client.get(f"/tasks/{scoped_keys['tasks'][0]}/transformation") + response = test_client.get(f"/tasks/{scoped_keys['tasks'][0]}/transformation/gufe") assert response.status_code == 200 data = response.json() assert len(data) == 2 diff --git a/devtools/configs/synchronous-compute-settings.yaml b/devtools/configs/synchronous-compute-settings.yaml index a9c29ab5..1712d444 100644 --- a/devtools/configs/synchronous-compute-settings.yaml +++ b/devtools/configs/synchronous-compute-settings.yaml @@ -44,6 +44,10 @@ init: scopes: - '*-*-*' + # Names of Protocols to run with this service; regex patterns are allowed. + # `None` means no restriction + protocols: null + # Maximum number of Tasks to claim at a time from a TaskHub. claim_limit: 1 From 2ad9bed81019f90ae06c5fa20407155e80355f73 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Fri, 5 Jan 2024 12:24:52 -0700 Subject: [PATCH 03/22] Black! --- alchemiscale/compute/api.py | 1 + alchemiscale/compute/client.py | 14 ++++++++------ alchemiscale/compute/service.py | 18 +++++++++--------- alchemiscale/compute/settings.py | 2 +- .../tests/integration/compute/conftest.py | 4 +++- .../integration/compute/test_compute_api.py | 4 +++- 6 files changed, 25 insertions(+), 18 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 613ff417..e9196272 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -211,6 +211,7 @@ def get_task_transformation( return str(transformation) + @router.get("/tasks/{task_scoped_key}/transformation/gufe") def retrieve_task_transformation( task_scoped_key, diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index c12041db..db5223fe 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -72,13 +72,16 @@ def query_taskhubs( return taskhubs def claim_taskhub_tasks( - self, taskhub: ScopedKey, - compute_service_id: ComputeServiceID, - count: int = 1, - protocols: Optional[List[str]] = None + self, + taskhub: ScopedKey, + compute_service_id: ComputeServiceID, + count: int = 1, + protocols: Optional[List[str]] = None, ) -> Task: """Claim a `Task` from the specified `TaskHub`""" - data = dict(compute_service_id=str(compute_service_id), count=count, protocols=protocols) + data = dict( + compute_service_id=str(compute_service_id), count=count, protocols=protocols + ) tasks = self._post_resource(f"taskhubs/{taskhub}/claim", data) return [ScopedKey.from_str(t) if t is not None else None for t in tasks] @@ -88,7 +91,6 @@ def get_task_transformation(self, task: ScopedKey) -> ScopedKey: transformation = self._get_resource(f"/tasks/{task}/transformation") return ScopedKey.from_str(transformation) - def retrieve_task_transformation( self, task: ScopedKey ) -> Tuple[Transformation, Optional[ProtocolDAGResult]]: diff --git a/alchemiscale/compute/service.py b/alchemiscale/compute/service.py index 0a32b270..ec253229 100644 --- a/alchemiscale/compute/service.py +++ b/alchemiscale/compute/service.py @@ -74,10 +74,7 @@ class SynchronousComputeService: """ - def __init__( - self, - settings: ComputeServiceSettings - ): + def __init__(self, settings: ComputeServiceSettings): """Create a `SynchronousComputeService` instance.""" self.settings = settings @@ -160,7 +157,9 @@ def heartbeat(self): self.beat() time.sleep(self.heartbeat_interval) - def claim_tasks(self, count=1, protocols: Optional[List[str]] = None) -> List[Optional[ScopedKey]]: + def claim_tasks( + self, count=1, protocols: Optional[List[str]] = None + ) -> List[Optional[ScopedKey]]: """Get a Task to execute from compute API. Returns `None` if no Task was available matching service configuration. @@ -197,7 +196,7 @@ def claim_tasks(self, count=1, protocols: Optional[List[str]] = None) -> List[Op taskhub, compute_service_id=self.compute_service_id, count=(count - len(tasks)), - protocols=protocols + protocols=protocols, ) # gather up claimed tasks, if present @@ -221,9 +220,10 @@ def task_to_protocoldag( """ - transformation, extends_protocoldagresult = self.client.retrieve_task_transformation( - task - ) + ( + transformation, + extends_protocoldagresult, + ) = self.client.retrieve_task_transformation(task) protocoldag = transformation.create( extends=extends_protocoldagresult, diff --git a/alchemiscale/compute/settings.py b/alchemiscale/compute/settings.py index 0afa9c9f..f4bbe62f 100644 --- a/alchemiscale/compute/settings.py +++ b/alchemiscale/compute/settings.py @@ -54,7 +54,7 @@ class ComputeServiceSettings(BaseModel): description="Scopes to limit Task claiming to; defaults to all Scopes accessible by compute identity.", ) protocols: Optional[List[str]] = Field( - None, + None, description="Names of Protocols to run with this service; regex patterns are allowed. `None` means no restriction", ) claim_limit: int = Field( diff --git a/alchemiscale/tests/integration/compute/conftest.py b/alchemiscale/tests/integration/compute/conftest.py index 728cd5e6..84fd3e0b 100644 --- a/alchemiscale/tests/integration/compute/conftest.py +++ b/alchemiscale/tests/integration/compute/conftest.py @@ -147,7 +147,9 @@ def get_token_data_depends_override(): @pytest.fixture -def compute_api_no_auth(n4js_preloaded, s3os, scope_consistent_token_data_depends_override): +def compute_api_no_auth( + n4js_preloaded, s3os, scope_consistent_token_data_depends_override +): def get_s3os_override(): return s3os diff --git a/alchemiscale/tests/integration/compute/test_compute_api.py b/alchemiscale/tests/integration/compute/test_compute_api.py index b1ed6c14..19cda547 100644 --- a/alchemiscale/tests/integration/compute/test_compute_api.py +++ b/alchemiscale/tests/integration/compute/test_compute_api.py @@ -69,7 +69,9 @@ def test_retrieve_task_transformation( test_client, scoped_keys, ): - response = test_client.get(f"/tasks/{scoped_keys['tasks'][0]}/transformation/gufe") + response = test_client.get( + f"/tasks/{scoped_keys['tasks'][0]}/transformation/gufe" + ) assert response.status_code == 200 data = response.json() assert len(data) == 2 From 8c1d7ee496bdb2b27bb675ffeafb4fad4878f8af Mon Sep 17 00:00:00 2001 From: David Dotson Date: Fri, 1 Mar 2024 17:15:43 -0700 Subject: [PATCH 04/22] Added __init__.py files so alchemiscale_fah can import components --- alchemiscale/tests/__init__.py | 0 alchemiscale/tests/integration/__init__.py | 0 alchemiscale/tests/integration/compute/client/conftest.py | 1 - alchemiscale/tests/unit/__init__.py | 0 4 files changed, 1 deletion(-) create mode 100644 alchemiscale/tests/__init__.py create mode 100644 alchemiscale/tests/integration/__init__.py create mode 100644 alchemiscale/tests/unit/__init__.py diff --git a/alchemiscale/tests/__init__.py b/alchemiscale/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/alchemiscale/tests/integration/__init__.py b/alchemiscale/tests/integration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/alchemiscale/tests/integration/compute/client/conftest.py b/alchemiscale/tests/integration/compute/client/conftest.py index f4c92f8a..aebc5fe1 100644 --- a/alchemiscale/tests/integration/compute/client/conftest.py +++ b/alchemiscale/tests/integration/compute/client/conftest.py @@ -3,7 +3,6 @@ from time import sleep import uvicorn -import requests from alchemiscale.settings import get_base_api_settings from alchemiscale.base.api import get_n4js_depends, get_s3os_depends diff --git a/alchemiscale/tests/unit/__init__.py b/alchemiscale/tests/unit/__init__.py new file mode 100644 index 00000000..e69de29b From 60194fd82648e3d5a2eb1b9e940c7b0aa44e0c81 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 15 Apr 2024 21:58:43 -0700 Subject: [PATCH 05/22] Add pydantic option arbitrary_types_allowed for comute settings --- alchemiscale/compute/settings.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/alchemiscale/compute/settings.py b/alchemiscale/compute/settings.py index f4bbe62f..a7d42293 100644 --- a/alchemiscale/compute/settings.py +++ b/alchemiscale/compute/settings.py @@ -1,4 +1,4 @@ -import os +from pathlib import Path from typing import Union, Optional, List, Dict, Tuple from pydantic import BaseModel, Field @@ -8,6 +8,9 @@ class ComputeServiceSettings(BaseModel): """Core settings schema for a compute service.""" + class Config: + arbitrary_types_allowed = True + api_url: str = Field( ..., description="URL of the compute API to execute Tasks for." ) @@ -25,10 +28,10 @@ class ComputeServiceSettings(BaseModel): "resources, e.g. different hosts or HPC clusters." ), ) - shared_basedir: os.PathLike = Field( + shared_basedir: Path = Field( ..., description="Filesystem path to use for `ProtocolDAG` `shared` space." ) - scratch_basedir: os.PathLike = Field( + scratch_basedir: Path = Field( ..., description="Filesystem path to use for `ProtocolUnit` `scratch` space." ) keep_shared: bool = Field( @@ -64,7 +67,7 @@ class ComputeServiceSettings(BaseModel): "WARN", description="The loglevel at which to report; see the :mod:`logging` docs for available levels.", ) - logfile: Optional[os.PathLike] = Field( + logfile: Optional[Path] = Field( None, description="Path to file for logging output; if not set, logging will only go to STDOUT.", ) From 02dd08598746c2681e604d1e36ab91b749739b4e Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 15 Apr 2024 22:08:06 -0700 Subject: [PATCH 06/22] Add optional Protocols filtering in claiming code --- alchemiscale/compute/api.py | 2 ++ alchemiscale/storage/cypher.py | 4 ++++ alchemiscale/storage/statestore.py | 26 ++++++++++++++++++++++---- pyproject.toml | 4 ++++ 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 76036f3f..4bd58df6 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -177,6 +177,7 @@ def claim_taskhub_tasks( *, compute_service_id: str = Body(), count: int = Body(), + protocols: Optional[List[str]] = Body(), n4js: Neo4jStore = Depends(get_n4js_depends), token: TokenData = Depends(get_token_data_depends), ): @@ -187,6 +188,7 @@ def claim_taskhub_tasks( taskhub=taskhub_scoped_key, compute_service_id=ComputeServiceID(compute_service_id), count=count, + protocols=protocols ) return [str(t) if t is not None else None for t in tasks] diff --git a/alchemiscale/storage/cypher.py b/alchemiscale/storage/cypher.py index 6ae5a005..23aed988 100644 --- a/alchemiscale/storage/cypher.py +++ b/alchemiscale/storage/cypher.py @@ -21,3 +21,7 @@ def cypher_list_from_scoped_keys(scoped_keys: List[Optional[ScopedKey]]) -> str: if scoped_key: data.append('"' + str(scoped_key) + '"') return "[" + ", ".join(data) + "]" + + +def cypher_or(items): + return "|".join(items) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index c0190f1e..1de70c8b 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -14,7 +14,7 @@ import numpy as np import networkx as nx -from gufe import AlchemicalNetwork, Transformation, NonTransformation, Settings +from gufe import AlchemicalNetwork, Transformation, NonTransformation, Settings, Protocol from gufe.tokenization import GufeTokenizable, GufeKey, JSON_HANDLER from neo4j import Transaction, GraphDatabase, Driver @@ -29,7 +29,7 @@ ) from ..strategies import Strategy from ..models import Scope, ScopedKey -from .cypher import cypher_list_from_scoped_keys +from .cypher import cypher_list_from_scoped_keys, cypher_or from ..security.models import CredentialedEntity from ..settings import Neo4jStoreSettings @@ -1417,7 +1417,11 @@ def get_taskhub_unclaimed_tasks( return [ScopedKey.from_str(t["_scoped_key"]) for t in tasks] def claim_taskhub_tasks( - self, taskhub: ScopedKey, compute_service_id: ComputeServiceID, count: int = 1 + self, + taskhub: ScopedKey, + compute_service_id: ComputeServiceID, + count: int = 1, + protocols: Optional[List[Protocol]] = None, ) -> List[Union[ScopedKey, None]]: """Claim a TaskHub Task. @@ -1438,16 +1442,30 @@ def claim_taskhub_tasks( Unique identifier for the compute service claiming the Tasks for execution. count Claim the given number of Tasks in a single transaction. + protocols + Protocols to restrict Task claiming to. `None` means no restriction. + If an empty list, raises ValueError. """ + if protocols is not None and len(protocols) == 0: + raise ValueError('`protocols` must be either `None` or not empty') q = f""" - MATCH (th:TaskHub {{`_scoped_key`: '{taskhub}'}})-[actions:ACTIONS]-(task:Task) + MATCH (th:TaskHub {{`_scoped_key`: '{taskhub}'}})-[actions:ACTIONS]-(task:Task), WHERE task.status = '{TaskStatusEnum.waiting.value}' AND actions.weight > 0 OPTIONAL MATCH (task)-[:EXTENDS]->(other_task:Task) WITH task, other_task, actions + """ + + # filter down to `protocols`, if specified + if protocols is not None: + q += f""" + MATCH (task)-[:PERFORMS]->(:Transformation)-[:DEPENDS_ON]->(protocol:{cypher_or(protocols)}) + """ + + q += f""" WHERE other_task.status = '{TaskStatusEnum.complete.value}' OR other_task IS NULL RETURN task.`_scoped_key`, task.priority, actions.weight diff --git a/pyproject.toml b/pyproject.toml index 87551cec..b96ed6b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,3 +76,7 @@ distance-dirty = "{base_version}+{distance}.{vcs}{rev}.dirty" method = "git" match = ["*"] default-tag = "0.0.0" + + +[project.entry-points.pytest11] +alchemiscale_fixtures = "alchemiscale.tests.integration.conftest" From 5b873b38fb7f2dafc0163ea368c3d78618da3a01 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 15 Apr 2024 22:08:27 -0700 Subject: [PATCH 07/22] Black! --- alchemiscale/compute/api.py | 2 +- alchemiscale/storage/statestore.py | 10 ++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 4bd58df6..70a13156 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -188,7 +188,7 @@ def claim_taskhub_tasks( taskhub=taskhub_scoped_key, compute_service_id=ComputeServiceID(compute_service_id), count=count, - protocols=protocols + protocols=protocols, ) return [str(t) if t is not None else None for t in tasks] diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 1de70c8b..aa9b0a1f 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -14,7 +14,13 @@ import numpy as np import networkx as nx -from gufe import AlchemicalNetwork, Transformation, NonTransformation, Settings, Protocol +from gufe import ( + AlchemicalNetwork, + Transformation, + NonTransformation, + Settings, + Protocol, +) from gufe.tokenization import GufeTokenizable, GufeKey, JSON_HANDLER from neo4j import Transaction, GraphDatabase, Driver @@ -1448,7 +1454,7 @@ def claim_taskhub_tasks( """ if protocols is not None and len(protocols) == 0: - raise ValueError('`protocols` must be either `None` or not empty') + raise ValueError("`protocols` must be either `None` or not empty") q = f""" MATCH (th:TaskHub {{`_scoped_key`: '{taskhub}'}})-[actions:ACTIONS]-(task:Task), From 6e12ba63cf41e7628432d543e48caee25c07cc7b Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 15 Apr 2024 22:35:59 -0700 Subject: [PATCH 08/22] Getting errors from pytest for fixtures being registered as a plugin already --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b96ed6b1..ea86f6bf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -78,5 +78,5 @@ match = ["*"] default-tag = "0.0.0" -[project.entry-points.pytest11] -alchemiscale_fixtures = "alchemiscale.tests.integration.conftest" +#[project.entry-points.pytest11] +#alchemiscale_fixtures = "alchemiscale.tests.integration.conftest" From 1755f1f9451faca01faa7187c0c9213855818488 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 29 Apr 2024 17:43:40 -0700 Subject: [PATCH 09/22] Fixes to test suite --- alchemiscale/cli.py | 3 ++- alchemiscale/compute/settings.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/alchemiscale/cli.py b/alchemiscale/cli.py index 1999ef57..c6d096ae 100644 --- a/alchemiscale/cli.py +++ b/alchemiscale/cli.py @@ -362,6 +362,7 @@ def get_settings_override(): def synchronous(config_file): from alchemiscale.models import Scope from alchemiscale.compute.service import SynchronousComputeService + from alchemiscale.compute.settings import ComputeServiceSettings params = yaml.safe_load(config_file) @@ -373,7 +374,7 @@ def synchronous(config_file): Scope.from_str(scope) for scope in params_init["scopes"] ] - service = SynchronousComputeService(**params_init) + service = SynchronousComputeService(ComputeServiceSettings(**params_init)) # add signal handling for signame in {"SIGHUP", "SIGINT", "SIGTERM"}: diff --git a/alchemiscale/compute/settings.py b/alchemiscale/compute/settings.py index a7d42293..fe77214b 100644 --- a/alchemiscale/compute/settings.py +++ b/alchemiscale/compute/settings.py @@ -58,7 +58,7 @@ class Config: ) protocols: Optional[List[str]] = Field( None, - description="Names of Protocols to run with this service; regex patterns are allowed. `None` means no restriction", + description="Names of Protocols to run with this service; `None` means no restriction", ) claim_limit: int = Field( 1000, description="Maximum number of Tasks to claim at a time from a TaskHub." From f78d5adc641546fb7d76145138d8789f87e2da88 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 29 Apr 2024 21:56:27 -0700 Subject: [PATCH 10/22] Fix to statestore for broken tests --- alchemiscale/compute/api.py | 64 +++++++++++++++++++ alchemiscale/compute/client.py | 30 +++++++-- alchemiscale/storage/statestore.py | 3 +- .../compute/client/test_compute_service.py | 20 +++--- 4 files changed, 101 insertions(+), 16 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 70a13156..d3fe7ec7 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -8,6 +8,7 @@ import os import json from datetime import datetime, timedelta +import random from fastapi import FastAPI, APIRouter, Body, Depends from fastapi.middleware.gzip import GZipMiddleware @@ -23,6 +24,7 @@ get_cred_entity, validate_scopes, validate_scopes_query, + minimize_scope_space, _check_store_connectivity, gufe_to_json, GzipRoute, @@ -193,6 +195,68 @@ def claim_taskhub_tasks( return [str(t) if t is not None else None for t in tasks] +@router.post("/claim") +def claim_tasks( + scopes: List[Scope] = Body(), + compute_service_id: str = Body(), + count: int = Body(), + protocols: Optional[List[str]] = Body(), + n4js: Neo4jStore = Depends(get_n4js_depends), + token: TokenData = Depends(get_token_data_depends), +): + # intersect query scopes with accessible scopes in the token + scopes_reduced = minimize_scope_space(scopes) + query_scopes = [] + for scope in scopes_reduced: + query_scopes.append(validate_scopes_query(scope, token)) + + taskhubs = dict() + # query each scope for available taskhubs + # loop might be more removable in the future with a Union like operator on scopes + for single_query_scope in query_scopes: + taskhubs.update_results( + n4js.query_taskhubs( + scope=single_query_scope, return_gufe=True + ) + ) + + # list of tasks to return + tasks = [] + + if len(taskhubs) == 0: + return [] + + # claim tasks from taskhubs based on weight; keep going till we hit our + # total desired task count, or we run out of taskhubs to draw from + while len(tasks) < count and len(taskhubs) > 0: + weights = [th.weight for th in taskhubs.values()] + + if sum(weights) == 0: + break + + # based on weights, choose taskhub to draw from + taskhub: ScopedKey = random.choices( + list(taskhubs.keys()), weights=weights + )[0] + + # claim tasks from the taskhub + claimed_tasks = n4js.client.claim_taskhub_tasks( + taskhub, + compute_service_id=ComputeServiceID(compute_service_id), + count=(count - len(tasks)), + protocols=protocols, + ) + + # gather up claimed tasks, if present + for t in claimed_tasks: + if t is not None: + tasks.append(t) + + # remove this taskhub from the options available; repeat + taskhubs.pop(taskhub) + + return [str(t) if t is not None else None for t in tasks] + @router.get("/tasks/{task_scoped_key}/transformation") def get_task_transformation( diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index 9bc349fc..5d716e86 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -35,15 +35,15 @@ class AlchemiscaleComputeClient(AlchemiscaleBaseClient): _exception = AlchemiscaleComputeClientError def register(self, compute_service_id: ComputeServiceID): - res = self._post_resource(f"computeservice/{compute_service_id}/register", {}) + res = self._post_resource(f"/computeservice/{compute_service_id}/register", {}) return ComputeServiceID(res) def deregister(self, compute_service_id: ComputeServiceID): - res = self._post_resource(f"computeservice/{compute_service_id}/deregister", {}) + res = self._post_resource(f"/computeservice/{compute_service_id}/deregister", {}) return ComputeServiceID(res) def heartbeat(self, compute_service_id: ComputeServiceID): - res = self._post_resource(f"computeservice/{compute_service_id}/heartbeat", {}) + res = self._post_resource(f"/computeservice/{compute_service_id}/heartbeat", {}) return ComputeServiceID(res) def list_scopes(self) -> List[Scope]: @@ -81,7 +81,25 @@ def claim_taskhub_tasks( data = dict( compute_service_id=str(compute_service_id), count=count, protocols=protocols ) - tasks = self._post_resource(f"taskhubs/{taskhub}/claim", data) + tasks = self._post_resource(f"/taskhubs/{taskhub}/claim", data) + + return [ScopedKey.from_str(t) if t is not None else None for t in tasks] + + def claim_tasks( + self, + scopes: List[Scope], + compute_service_id: ComputeServiceID, + count: int = 1, + protocols: Optional[List[str]] = None, + ): + """Claim Tasks from TaskHubs within a list of Scopes. + + """ + data = dict( + scopes=[scope.dict() for scope in scopes], + compute_service_id=str(compute_service_id), count=count, protocols=protocols + ) + tasks = self._post_resource("/claim", data) return [ScopedKey.from_str(t) if t is not None else None for t in tasks] @@ -94,7 +112,7 @@ def retrieve_task_transformation( self, task: ScopedKey ) -> Tuple[Transformation, Optional[ProtocolDAGResult]]: transformation, protocoldagresult = self._get_resource( - f"tasks/{task}/transformation/gufe" + f"/tasks/{task}/transformation/gufe" ) return ( @@ -115,6 +133,6 @@ def set_task_result( compute_service_id=str(compute_service_id), ) - pdr_sk = self._post_resource(f"tasks/{task}/results", data) + pdr_sk = self._post_resource(f"/tasks/{task}/results", data) return ScopedKey.from_dict(pdr_sk) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index 8bb2eb9d..237272e5 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -1681,7 +1681,7 @@ def claim_taskhub_tasks( raise ValueError("`protocols` must be either `None` or not empty") q = f""" - MATCH (th:TaskHub {{`_scoped_key`: '{taskhub}'}})-[actions:ACTIONS]-(task:Task), + MATCH (th:TaskHub {{`_scoped_key`: '{taskhub}'}})-[actions:ACTIONS]-(task:Task) WHERE task.status = '{TaskStatusEnum.waiting.value}' AND actions.weight > 0 OPTIONAL MATCH (task)-[:EXTENDS]->(other_task:Task) @@ -1693,6 +1693,7 @@ def claim_taskhub_tasks( if protocols is not None: q += f""" MATCH (task)-[:PERFORMS]->(:Transformation)-[:DEPENDS_ON]->(protocol:{cypher_or(protocols)}) + WITH task, other_task, actions """ q += f""" diff --git a/alchemiscale/tests/integration/compute/client/test_compute_service.py b/alchemiscale/tests/integration/compute/client/test_compute_service.py index 9ae4d738..9140ab2a 100644 --- a/alchemiscale/tests/integration/compute/client/test_compute_service.py +++ b/alchemiscale/tests/integration/compute/client/test_compute_service.py @@ -11,6 +11,7 @@ from alchemiscale.storage.statestore import Neo4jStore from alchemiscale.storage.objectstore import S3ObjectStore from alchemiscale.compute.service import SynchronousComputeService +from alchemiscale.compute.settings import ComputeServiceSettings class TestSynchronousComputeService: @@ -20,15 +21,16 @@ class TestSynchronousComputeService: def service(self, n4js_preloaded, compute_client, tmpdir): with tmpdir.as_cwd(): return SynchronousComputeService( - api_url=compute_client.api_url, - identifier=compute_client.identifier, - key=compute_client.key, - name="test_compute_service", - shared_basedir=Path("shared").absolute(), - scratch_basedir=Path("scratch").absolute(), - heartbeat_interval=1, - sleep_interval=1, - ) + ComputeServiceSettings( + api_url=compute_client.api_url, + identifier=compute_client.identifier, + key=compute_client.key, + name="test_compute_service", + shared_basedir=Path("shared").absolute(), + scratch_basedir=Path("scratch").absolute(), + heartbeat_interval=1, + sleep_interval=1, + )) def test_heartbeat(self, n4js_preloaded, service): n4js: Neo4jStore = n4js_preloaded From d54bc0bb76bb473ed963257708187dc361c6cb3f Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 29 Apr 2024 22:23:29 -0700 Subject: [PATCH 11/22] Attempting to fix broken tests --- alchemiscale/compute/api.py | 2 +- alchemiscale/compute/service.py | 41 ++----------------- .../compute/client/test_compute_client.py | 26 ++++++++++++ 3 files changed, 31 insertions(+), 38 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index d3fe7ec7..213a88e2 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -200,7 +200,7 @@ def claim_tasks( scopes: List[Scope] = Body(), compute_service_id: str = Body(), count: int = Body(), - protocols: Optional[List[str]] = Body(), + protocols: Optional[List[str]] = Body(None, embed=True), n4js: Neo4jStore = Depends(get_n4js_depends), token: TokenData = Depends(get_token_data_depends), ): diff --git a/alchemiscale/compute/service.py b/alchemiscale/compute/service.py index d38b0231..d7ced75a 100644 --- a/alchemiscale/compute/service.py +++ b/alchemiscale/compute/service.py @@ -173,44 +173,11 @@ def claim_tasks( Regex patterns are allowed. """ - # list of tasks to return - tasks = [] - taskhubs: Dict[ScopedKey, TaskHub] = self.client.query_taskhubs( - scopes=self.scopes, return_gufe=True - ) - - if len(taskhubs) == 0: - return [] - - # claim tasks from taskhubs based on weight; keep going till we hit our - # total desired task count, or we run out of taskhubs to draw from - while len(tasks) < count and len(taskhubs) > 0: - weights = [th.weight for th in taskhubs.values()] - - if sum(weights) == 0: - break - - # based on weights, choose taskhub to draw from - taskhub: List[ScopedKey] = random.choices( - list(taskhubs.keys()), weights=weights - )[0] - - # claim tasks from the taskhub - claimed_tasks = self.client.claim_taskhub_tasks( - taskhub, - compute_service_id=self.compute_service_id, - count=(count - len(tasks)), - protocols=protocols, - ) - - # gather up claimed tasks, if present - for t in claimed_tasks: - if t is not None: - tasks.append(t) - - # remove this taskhub from the options available; repeat - taskhubs.pop(taskhub) + tasks = self.client.claim_tasks(scopes=self.scopes, + compute_service_id=self.compute_service_id, + count=count, + protocols=protocols) return tasks diff --git a/alchemiscale/tests/integration/compute/client/test_compute_client.py b/alchemiscale/tests/integration/compute/client/test_compute_client.py index aae19463..955a5f7f 100644 --- a/alchemiscale/tests/integration/compute/client/test_compute_client.py +++ b/alchemiscale/tests/integration/compute/client/test_compute_client.py @@ -182,6 +182,32 @@ def test_claim_taskhub_task( assert task_sks2[0] in remaining_tasks assert task_sks2[1] in remaining_tasks + def test_claim_tasks( + self, + scope_test, + n4js_preloaded, + compute_client: client.AlchemiscaleComputeClient, + compute_service_id, + uvicorn_server, + ): + # register compute service id + compute_client.register(compute_service_id) + + # claim a single task; should get highest priority task + task_sks = compute_client.claim_tasks( + scopes=[scope_test], + compute_service_id=compute_service_id, + ) + all_tasks = n4js_preloaded.query_tasks(scope=scope_test, status='waiting') + priorities = {task_sk: priority for task_sk, priority + in zip(all_tasks, n4js_preloaded.get_task_priority(all_tasks))} + + assert len(task_sks) == 1 + assert task_sks[0] in all_tasks + assert [t.gufe_key for t in task_sks] == [ + t.gufe_key for t in all_tasks if priorities[t] == 1 + ] + def test_get_task_transformation( self, scope_test, From f6a958df238fa34cb047744b8fe03b415244810b Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 29 Apr 2024 23:00:19 -0700 Subject: [PATCH 12/22] Believe I've fixed the tests... --- alchemiscale/compute/api.py | 10 +++++----- .../integration/compute/client/test_compute_client.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 213a88e2..42984cdd 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -179,7 +179,7 @@ def claim_taskhub_tasks( *, compute_service_id: str = Body(), count: int = Body(), - protocols: Optional[List[str]] = Body(), + protocols: Optional[List[str]] = Body(None, embed=True), n4js: Neo4jStore = Depends(get_n4js_depends), token: TokenData = Depends(get_token_data_depends), ): @@ -208,13 +208,13 @@ def claim_tasks( scopes_reduced = minimize_scope_space(scopes) query_scopes = [] for scope in scopes_reduced: - query_scopes.append(validate_scopes_query(scope, token)) + query_scopes.extend(validate_scopes_query(scope, token)) taskhubs = dict() # query each scope for available taskhubs # loop might be more removable in the future with a Union like operator on scopes - for single_query_scope in query_scopes: - taskhubs.update_results( + for single_query_scope in set(query_scopes): + taskhubs.update( n4js.query_taskhubs( scope=single_query_scope, return_gufe=True ) @@ -240,7 +240,7 @@ def claim_tasks( )[0] # claim tasks from the taskhub - claimed_tasks = n4js.client.claim_taskhub_tasks( + claimed_tasks = n4js.claim_taskhub_tasks( taskhub, compute_service_id=ComputeServiceID(compute_service_id), count=(count - len(tasks)), diff --git a/alchemiscale/tests/integration/compute/client/test_compute_client.py b/alchemiscale/tests/integration/compute/client/test_compute_client.py index 955a5f7f..389c6adb 100644 --- a/alchemiscale/tests/integration/compute/client/test_compute_client.py +++ b/alchemiscale/tests/integration/compute/client/test_compute_client.py @@ -198,7 +198,7 @@ def test_claim_tasks( scopes=[scope_test], compute_service_id=compute_service_id, ) - all_tasks = n4js_preloaded.query_tasks(scope=scope_test, status='waiting') + all_tasks = n4js_preloaded.query_tasks(scope=scope_test) priorities = {task_sk: priority for task_sk, priority in zip(all_tasks, n4js_preloaded.get_task_priority(all_tasks))} From dc9d32bea0edb135f38b904b10002116d514a204 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Mon, 29 Apr 2024 23:00:37 -0700 Subject: [PATCH 13/22] Black! --- alchemiscale/compute/api.py | 11 +++------- alchemiscale/compute/client.py | 20 ++++++++++-------- alchemiscale/compute/service.py | 10 +++++---- .../compute/client/test_compute_client.py | 12 +++++++---- .../compute/client/test_compute_service.py | 21 ++++++++++--------- 5 files changed, 39 insertions(+), 35 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 42984cdd..9c27bf5f 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -195,6 +195,7 @@ def claim_taskhub_tasks( return [str(t) if t is not None else None for t in tasks] + @router.post("/claim") def claim_tasks( scopes: List[Scope] = Body(), @@ -214,11 +215,7 @@ def claim_tasks( # query each scope for available taskhubs # loop might be more removable in the future with a Union like operator on scopes for single_query_scope in set(query_scopes): - taskhubs.update( - n4js.query_taskhubs( - scope=single_query_scope, return_gufe=True - ) - ) + taskhubs.update(n4js.query_taskhubs(scope=single_query_scope, return_gufe=True)) # list of tasks to return tasks = [] @@ -235,9 +232,7 @@ def claim_tasks( break # based on weights, choose taskhub to draw from - taskhub: ScopedKey = random.choices( - list(taskhubs.keys()), weights=weights - )[0] + taskhub: ScopedKey = random.choices(list(taskhubs.keys()), weights=weights)[0] # claim tasks from the taskhub claimed_tasks = n4js.claim_taskhub_tasks( diff --git a/alchemiscale/compute/client.py b/alchemiscale/compute/client.py index 5d716e86..901a7516 100644 --- a/alchemiscale/compute/client.py +++ b/alchemiscale/compute/client.py @@ -39,7 +39,9 @@ def register(self, compute_service_id: ComputeServiceID): return ComputeServiceID(res) def deregister(self, compute_service_id: ComputeServiceID): - res = self._post_resource(f"/computeservice/{compute_service_id}/deregister", {}) + res = self._post_resource( + f"/computeservice/{compute_service_id}/deregister", {} + ) return ComputeServiceID(res) def heartbeat(self, compute_service_id: ComputeServiceID): @@ -86,19 +88,19 @@ def claim_taskhub_tasks( return [ScopedKey.from_str(t) if t is not None else None for t in tasks] def claim_tasks( - self, + self, scopes: List[Scope], compute_service_id: ComputeServiceID, count: int = 1, protocols: Optional[List[str]] = None, - ): - """Claim Tasks from TaskHubs within a list of Scopes. - - """ + ): + """Claim Tasks from TaskHubs within a list of Scopes.""" data = dict( - scopes=[scope.dict() for scope in scopes], - compute_service_id=str(compute_service_id), count=count, protocols=protocols - ) + scopes=[scope.dict() for scope in scopes], + compute_service_id=str(compute_service_id), + count=count, + protocols=protocols, + ) tasks = self._post_resource("/claim", data) return [ScopedKey.from_str(t) if t is not None else None for t in tasks] diff --git a/alchemiscale/compute/service.py b/alchemiscale/compute/service.py index d7ced75a..2955555d 100644 --- a/alchemiscale/compute/service.py +++ b/alchemiscale/compute/service.py @@ -174,10 +174,12 @@ def claim_tasks( """ - tasks = self.client.claim_tasks(scopes=self.scopes, - compute_service_id=self.compute_service_id, - count=count, - protocols=protocols) + tasks = self.client.claim_tasks( + scopes=self.scopes, + compute_service_id=self.compute_service_id, + count=count, + protocols=protocols, + ) return tasks diff --git a/alchemiscale/tests/integration/compute/client/test_compute_client.py b/alchemiscale/tests/integration/compute/client/test_compute_client.py index 389c6adb..185eada4 100644 --- a/alchemiscale/tests/integration/compute/client/test_compute_client.py +++ b/alchemiscale/tests/integration/compute/client/test_compute_client.py @@ -195,12 +195,16 @@ def test_claim_tasks( # claim a single task; should get highest priority task task_sks = compute_client.claim_tasks( - scopes=[scope_test], - compute_service_id=compute_service_id, + scopes=[scope_test], + compute_service_id=compute_service_id, ) all_tasks = n4js_preloaded.query_tasks(scope=scope_test) - priorities = {task_sk: priority for task_sk, priority - in zip(all_tasks, n4js_preloaded.get_task_priority(all_tasks))} + priorities = { + task_sk: priority + for task_sk, priority in zip( + all_tasks, n4js_preloaded.get_task_priority(all_tasks) + ) + } assert len(task_sks) == 1 assert task_sks[0] in all_tasks diff --git a/alchemiscale/tests/integration/compute/client/test_compute_service.py b/alchemiscale/tests/integration/compute/client/test_compute_service.py index 9140ab2a..bb097257 100644 --- a/alchemiscale/tests/integration/compute/client/test_compute_service.py +++ b/alchemiscale/tests/integration/compute/client/test_compute_service.py @@ -21,16 +21,17 @@ class TestSynchronousComputeService: def service(self, n4js_preloaded, compute_client, tmpdir): with tmpdir.as_cwd(): return SynchronousComputeService( - ComputeServiceSettings( - api_url=compute_client.api_url, - identifier=compute_client.identifier, - key=compute_client.key, - name="test_compute_service", - shared_basedir=Path("shared").absolute(), - scratch_basedir=Path("scratch").absolute(), - heartbeat_interval=1, - sleep_interval=1, - )) + ComputeServiceSettings( + api_url=compute_client.api_url, + identifier=compute_client.identifier, + key=compute_client.key, + name="test_compute_service", + shared_basedir=Path("shared").absolute(), + scratch_basedir=Path("scratch").absolute(), + heartbeat_interval=1, + sleep_interval=1, + ) + ) def test_heartbeat(self, n4js_preloaded, service): n4js: Neo4jStore = n4js_preloaded From c722da007c59140f4fdf431f81356b3a2824af39 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Thu, 30 May 2024 10:48:55 -0700 Subject: [PATCH 14/22] Added __init__.py files to make modules more importable --- alchemiscale/tests/integration/compute/__init__.py | 0 alchemiscale/tests/integration/compute/client/__init__.py | 0 alchemiscale/tests/integration/interface/__init__.py | 0 alchemiscale/tests/integration/interface/client/__init__.py | 0 alchemiscale/tests/integration/storage/__init__.py | 0 5 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 alchemiscale/tests/integration/compute/__init__.py create mode 100644 alchemiscale/tests/integration/compute/client/__init__.py create mode 100644 alchemiscale/tests/integration/interface/__init__.py create mode 100644 alchemiscale/tests/integration/interface/client/__init__.py create mode 100644 alchemiscale/tests/integration/storage/__init__.py diff --git a/alchemiscale/tests/integration/compute/__init__.py b/alchemiscale/tests/integration/compute/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/alchemiscale/tests/integration/compute/client/__init__.py b/alchemiscale/tests/integration/compute/client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/alchemiscale/tests/integration/interface/__init__.py b/alchemiscale/tests/integration/interface/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/alchemiscale/tests/integration/interface/client/__init__.py b/alchemiscale/tests/integration/interface/client/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/alchemiscale/tests/integration/storage/__init__.py b/alchemiscale/tests/integration/storage/__init__.py new file mode 100644 index 00000000..e69de29b From ff507355f749efa3de560594d0ca383c9cda7daa Mon Sep 17 00:00:00 2001 From: David Dotson Date: Thu, 30 May 2024 22:11:41 -0700 Subject: [PATCH 15/22] Small fix to docstrings --- alchemiscale/compute/settings.py | 2 +- devtools/configs/synchronous-compute-settings.yaml | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/alchemiscale/compute/settings.py b/alchemiscale/compute/settings.py index fe77214b..87c80f97 100644 --- a/alchemiscale/compute/settings.py +++ b/alchemiscale/compute/settings.py @@ -58,7 +58,7 @@ class Config: ) protocols: Optional[List[str]] = Field( None, - description="Names of Protocols to run with this service; `None` means no restriction", + description="Names of Protocols to run with this service; `None` means no restriction.", ) claim_limit: int = Field( 1000, description="Maximum number of Tasks to claim at a time from a TaskHub." diff --git a/devtools/configs/synchronous-compute-settings.yaml b/devtools/configs/synchronous-compute-settings.yaml index 1712d444..23a9d9f2 100644 --- a/devtools/configs/synchronous-compute-settings.yaml +++ b/devtools/configs/synchronous-compute-settings.yaml @@ -44,8 +44,7 @@ init: scopes: - '*-*-*' - # Names of Protocols to run with this service; regex patterns are allowed. - # `None` means no restriction + # Names of Protocols to run with this service; `None` means no restriction protocols: null # Maximum number of Tasks to claim at a time from a TaskHub. From 2f4be7d4b6aa7fce7e633895881d214b1f34677e Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Thu, 11 Jul 2024 16:01:38 -0700 Subject: [PATCH 16/22] Specify protocols as Protocol subclasses or str --- alchemiscale/storage/statestore.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/alchemiscale/storage/statestore.py b/alchemiscale/storage/statestore.py index fd9c8bbd..fc8b38b2 100644 --- a/alchemiscale/storage/statestore.py +++ b/alchemiscale/storage/statestore.py @@ -1665,7 +1665,7 @@ def claim_taskhub_tasks( taskhub: ScopedKey, compute_service_id: ComputeServiceID, count: int = 1, - protocols: Optional[List[Protocol]] = None, + protocols: Optional[List[Union[Protocol, str]]] = None, ) -> List[Union[ScopedKey, None]]: """Claim a TaskHub Task. @@ -1705,8 +1705,14 @@ def claim_taskhub_tasks( # filter down to `protocols`, if specified if protocols is not None: + # need to extract qualnames if given protocol classes + protocols = [ + protocol.__qualname__ if isinstance(protocol, Protocol) else protocol + for protocol in protocols + ] + q += f""" - MATCH (task)-[:PERFORMS]->(:Transformation)-[:DEPENDS_ON]->(protocol:{cypher_or(protocols)}) + MATCH (task)-[:PERFORMS]->(:Transformation|NonTransformation)-[:DEPENDS_ON]->(protocol:{cypher_or(protocols)}) WITH task, other_task, actions """ From cf904e1315402eef453744fae0538881503939d8 Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Thu, 11 Jul 2024 16:03:49 -0700 Subject: [PATCH 17/22] Add new DummyProtocol subclasses * New protocols: DummyProtocolA, DummyProtocolB, DummyProtocolC * network_tyk2 now uses a variety of protocols for testing purposes --- alchemiscale/tests/integration/conftest.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/alchemiscale/tests/integration/conftest.py b/alchemiscale/tests/integration/conftest.py index 1875981e..c05a7374 100644 --- a/alchemiscale/tests/integration/conftest.py +++ b/alchemiscale/tests/integration/conftest.py @@ -221,6 +221,20 @@ def s3os(s3objectstore_settings): # test alchemical networks + +## define varying protocols to simulate protocol variety +class DummyProtocolA(DummyProtocol): + pass + + +class DummyProtocolB(DummyProtocol): + pass + + +class DummyProtocolC(DummyProtocol): + pass + + # TODO: add in atom mapping once `gufe`#35 is settled @@ -251,7 +265,7 @@ def network_tyk2(): Transformation( stateA=complexes[edge[0]], stateB=complexes[edge[1]], - protocol=DummyProtocol(settings=DummyProtocol.default_settings()), + protocol=DummyProtocolA(settings=DummyProtocolA.default_settings()), name=f"{edge[0]}_to_{edge[1]}_complex", ) for edge in tyk2s.connections @@ -260,7 +274,7 @@ def network_tyk2(): Transformation( stateA=solvated[edge[0]], stateB=solvated[edge[1]], - protocol=DummyProtocol(settings=DummyProtocol.default_settings()), + protocol=DummyProtocolB(settings=DummyProtocolB.default_settings()), name=f"{edge[0]}_to_{edge[1]}_solvent", ) for edge in tyk2s.connections @@ -270,7 +284,7 @@ def network_tyk2(): for cs in list(solvated.values()) + list(complexes.values()): nt = NonTransformation( system=cs, - protocol=DummyProtocol(DummyProtocol.default_settings()), + protocol=DummyProtocolC(DummyProtocolC.default_settings()), name=f"f{cs.name}_nt", ) nontransformations.append(nt) From d6170652ed54ad727558fdb2e2bf82843e8c1d4e Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Thu, 11 Jul 2024 16:07:06 -0700 Subject: [PATCH 18/22] Extend testing for claim_taskhub_tasks * Test if Tasks can be claimed by Protocol * Test if Tasks can be claimed by many Protocols --- .../integration/storage/test_statestore.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/alchemiscale/tests/integration/storage/test_statestore.py b/alchemiscale/tests/integration/storage/test_statestore.py index 2632524b..f09c9d13 100644 --- a/alchemiscale/tests/integration/storage/test_statestore.py +++ b/alchemiscale/tests/integration/storage/test_statestore.py @@ -1324,6 +1324,55 @@ def test_claim_taskhub_tasks(self, n4js: Neo4jStore, network_tyk2, scope_test): claimed6 = n4js.claim_taskhub_tasks(taskhub_sk, csid, count=2) assert claimed6 == [None] * 2 + def test_claim_taskhub_tasks_protocol_split( + self, n4js: Neo4jStore, network_tyk2, scope_test + ): + an = network_tyk2 + network_sk, taskhub_sk, _ = n4js.assemble_network(an, scope_test) + + from ..conftest import DummyProtocolA, DummyProtocolB, DummyProtocolC + from functools import reduce + + def reducer(collection, transformation): + protocol = transformation.protocol.__class__ + if len(collection[protocol]) >= 3: + return collection + sk = n4js.get_scoped_key(transformation, scope_test) + collection[transformation.protocol.__class__].append(sk) + return collection + + transformations = reduce( + reducer, + an.edges, + {DummyProtocolA: [], DummyProtocolB: [], DummyProtocolC: []}, + ) + + transformation_sks = [ + value for _, values in transformations.items() for value in values + ] + + task_sks = n4js.create_tasks(transformation_sks) + assert len(task_sks) == 9 + + # action the tasks + n4js.action_tasks(task_sks, taskhub_sk) + assert len(n4js.get_taskhub_unclaimed_tasks(taskhub_sk)) == 9 + + csid = ComputeServiceID("another task handler") + n4js.register_computeservice(ComputeServiceRegistration.from_now(csid)) + + claimedA = n4js.claim_taskhub_tasks( + taskhub_sk, csid, protocols=["DummyProtocolA"], count=9 + ) + + assert len([sk for sk in claimedA if sk]) == 3 + + claimedBC = n4js.claim_taskhub_tasks( + taskhub_sk, csid, protocols=["DummyProtocolB", "DummyProtocolC"], count=9 + ) + + assert len([sk for sk in claimedBC if sk]) == 6 + def test_claim_taskhub_tasks_deregister( self, n4js: Neo4jStore, network_tyk2, scope_test ): From 83614d92cfe70d6b44ad228ec3333d7c1fc28b3e Mon Sep 17 00:00:00 2001 From: Ian Kenney Date: Thu, 11 Jul 2024 16:17:39 -0700 Subject: [PATCH 19/22] Moved imports out of test method --- alchemiscale/tests/integration/storage/test_statestore.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/alchemiscale/tests/integration/storage/test_statestore.py b/alchemiscale/tests/integration/storage/test_statestore.py index f09c9d13..802eab94 100644 --- a/alchemiscale/tests/integration/storage/test_statestore.py +++ b/alchemiscale/tests/integration/storage/test_statestore.py @@ -3,6 +3,7 @@ from typing import List, Dict from pathlib import Path from itertools import chain +from functools import reduce import pytest from gufe import AlchemicalNetwork @@ -27,6 +28,8 @@ ) from alchemiscale.security.auth import hash_key +from ..conftest import DummyProtocolA, DummyProtocolB, DummyProtocolC + class TestStateStore: ... @@ -1330,9 +1333,6 @@ def test_claim_taskhub_tasks_protocol_split( an = network_tyk2 network_sk, taskhub_sk, _ = n4js.assemble_network(an, scope_test) - from ..conftest import DummyProtocolA, DummyProtocolB, DummyProtocolC - from functools import reduce - def reducer(collection, transformation): protocol = transformation.protocol.__class__ if len(collection[protocol]) >= 3: From 6e84bb96f5c21957723571f0caebb0ef440e266f Mon Sep 17 00:00:00 2001 From: "David L. Dotson" Date: Sun, 14 Jul 2024 22:55:38 -0700 Subject: [PATCH 20/22] Update alchemiscale/compute/api.py Co-authored-by: Ian Kenney --- alchemiscale/compute/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 9c27bf5f..2ba3be88 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -250,7 +250,7 @@ def claim_tasks( # remove this taskhub from the options available; repeat taskhubs.pop(taskhub) - return [str(t) if t is not None else None for t in tasks] + return [str(t) for t in tasks] @router.get("/tasks/{task_scoped_key}/transformation") From 04f8ac582231488f591bdbdb563e95c822d313d2 Mon Sep 17 00:00:00 2001 From: "David L. Dotson" Date: Sun, 14 Jul 2024 22:56:16 -0700 Subject: [PATCH 21/22] Update alchemiscale/compute/api.py Co-authored-by: Ian Kenney --- alchemiscale/compute/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index 2ba3be88..cc6a240b 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -265,7 +265,7 @@ def get_task_transformation( transformation: ScopedKey - transformation, protocoldagresultref = n4js.get_task_transformation( + transformation, _ = n4js.get_task_transformation( task=task_scoped_key, return_gufe=False, ) From 208eeb39481508faffde1b2332c8f7f8ba87a3d4 Mon Sep 17 00:00:00 2001 From: David Dotson Date: Sun, 14 Jul 2024 23:08:36 -0700 Subject: [PATCH 22/22] Adjustments from @ianmkenney review --- alchemiscale/compute/api.py | 2 +- pyproject.toml | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/alchemiscale/compute/api.py b/alchemiscale/compute/api.py index cc6a240b..9337055b 100644 --- a/alchemiscale/compute/api.py +++ b/alchemiscale/compute/api.py @@ -250,7 +250,7 @@ def claim_tasks( # remove this taskhub from the options available; repeat taskhubs.pop(taskhub) - return [str(t) for t in tasks] + return [str(t) for t in tasks] + [None] * (count - len(tasks)) @router.get("/tasks/{task_scoped_key}/transformation") diff --git a/pyproject.toml b/pyproject.toml index ea86f6bf..87551cec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,7 +76,3 @@ distance-dirty = "{base_version}+{distance}.{vcs}{rev}.dirty" method = "git" match = ["*"] default-tag = "0.0.0" - - -#[project.entry-points.pytest11] -#alchemiscale_fixtures = "alchemiscale.tests.integration.conftest"