Skip to content

Commit

Permalink
SynchronousComputeService now properly claims tasks with protocols …
Browse files Browse the repository at this point in the history
…filter

We weren't actually using the `protocols` setting for the
`SynchronousComputeService`, so `Task` claims weren't being filtered if
this was set to anything but `None`.
  • Loading branch information
dotsdl committed Nov 30, 2024
1 parent cec1538 commit 9cb9ce5
Showing 1 changed file with 6 additions and 27 deletions.
33 changes: 6 additions & 27 deletions alchemiscale/compute/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,32 +157,6 @@ def heartbeat(self):
self.beat()
time.sleep(self.heartbeat_interval)

def claim_tasks(
self, count=1, protocols: Optional[List[str]] = None
) -> List[Optional[ScopedKey]]:
"""Get a Task to execute from compute API.
Returns `None` if no Task was available matching service configuration.
Parameters
----------
count
The maximum number of Tasks to claim.
protocols
Protocol names to restrict Task claiming to. `None` means no restriction.
Regex patterns are allowed.
"""

tasks = self.client.claim_tasks(
scopes=self.scopes,
compute_service_id=self.compute_service_id,
count=count,
protocols=protocols,
)

return tasks

def task_to_protocoldag(
self, task: ScopedKey
) -> Tuple[ProtocolDAG, Transformation, Optional[ProtocolDAGResult]]:
Expand Down Expand Up @@ -306,7 +280,12 @@ def cycle(self, max_tasks: Optional[int] = None, max_time: Optional[int] = None)

# claim tasks from the compute API
self.logger.info("Claiming tasks")
tasks: List[ScopedKey] = self.claim_tasks(self.claim_limit)
tasks: List[ScopedKey] = self.client.claim_tasks(
scopes=self.scopes,
compute_service_id=self.compute_service_id,
count=self.claim_limit,
protocols=self.settings.protocols,
)
self.logger.info("Claimed %d tasks", len([t for t in tasks if t is not None]))

# if no tasks claimed, sleep
Expand Down

0 comments on commit 9cb9ce5

Please sign in to comment.